Skip to main content

matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32    DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
36    StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
37    event_cache::store::EventCacheStoreLock,
38    media::store::MediaStoreLock,
39    store::{
40        DynStateStore, RoomLoadSettings, SupportedVersionsResponse, TtlStoreValue,
41        WellKnownResponse,
42    },
43    sync::{Notification, RoomUpdates},
44    task_monitor::TaskMonitor,
45};
46use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl_cache::TtlCache};
47#[cfg(feature = "e2e-encryption")]
48use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
49use ruma::{
50    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
51    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
52    api::{
53        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
54        client::{
55            account::whoami,
56            alias::{create_alias, delete_alias, get_alias},
57            authenticated_media,
58            device::{self, delete_devices, get_devices, update_device},
59            directory::{get_public_rooms, get_public_rooms_filtered},
60            discovery::{
61                discover_homeserver::{self, RtcFocusInfo},
62                get_capabilities::{self, v3::Capabilities},
63                get_supported_versions,
64            },
65            error::{ErrorKind, UnknownTokenErrorData},
66            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
67            knock::knock_room,
68            media,
69            membership::{join_room_by_id, join_room_by_id_or_alias},
70            room::create_room,
71            session::login::v3::DiscoveryInfo,
72            sync::sync_events,
73            threads::get_thread_subscriptions_changes,
74            uiaa,
75            user_directory::search_users,
76        },
77        error::FromHttpResponseError,
78        path_builder::PathBuilder,
79    },
80    assign,
81    events::direct::DirectUserIdentifier,
82    push::Ruleset,
83    time::Instant,
84};
85use serde::de::DeserializeOwned;
86use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
87use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
88use url::Url;
89
90use self::{
91    caches::{CachedValue, ClientCaches},
92    futures::SendRequest,
93};
94use crate::{
95    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
96    Room, SessionTokens, TransmissionProgress,
97    authentication::{
98        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
99        oauth::OAuth,
100    },
101    client::thread_subscriptions::ThreadSubscriptionCatchup,
102    config::{RequestConfig, SyncToken},
103    deduplicating_handler::DeduplicatingHandler,
104    error::HttpResult,
105    event_cache::EventCache,
106    event_handler::{
107        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
108        EventHandlerStore, ObservableEventHandler, SyncEvent,
109    },
110    http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
111    latest_events::LatestEvents,
112    media::MediaError,
113    notification_settings::NotificationSettings,
114    room::RoomMember,
115    room_preview::RoomPreview,
116    send_queue::{SendQueue, SendQueueData},
117    sliding_sync::Version as SlidingSyncVersion,
118    sync::{RoomUpdate, SyncResponse},
119};
120#[cfg(feature = "e2e-encryption")]
121use crate::{
122    cross_process_lock::CrossProcessLock,
123    encryption::{
124        DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
125        VerificationState,
126    },
127};
128
129mod builder;
130pub(crate) mod caches;
131pub(crate) mod futures;
132pub(crate) mod thread_subscriptions;
133
134pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
135#[cfg(feature = "experimental-search")]
136use crate::search_index::SearchIndex;
137
138#[cfg(not(target_family = "wasm"))]
139type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
140#[cfg(target_family = "wasm")]
141type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
142
143#[cfg(not(target_family = "wasm"))]
144type NotificationHandlerFn =
145    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
146#[cfg(target_family = "wasm")]
147type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
148
149/// Enum controlling if a loop running callbacks should continue or abort.
150///
151/// This is mainly used in the [`sync_with_callback`] method, the return value
152/// of the provided callback controls if the sync loop should be exited.
153///
154/// [`sync_with_callback`]: #method.sync_with_callback
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub enum LoopCtrl {
157    /// Continue running the loop.
158    Continue,
159    /// Break out of the loop.
160    Break,
161}
162
163/// Represents changes that can occur to a `Client`s `Session`.
164#[derive(Debug, Clone, PartialEq)]
165pub enum SessionChange {
166    /// The session's token is no longer valid.
167    UnknownToken(UnknownTokenErrorData),
168    /// The session's tokens have been refreshed.
169    TokensRefreshed,
170}
171
172/// Information about the server vendor obtained from the federation API.
173#[derive(Debug, Clone, PartialEq, Eq)]
174#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
175pub struct ServerVendorInfo {
176    /// The server name.
177    pub server_name: String,
178    /// The server version.
179    pub version: String,
180}
181
182/// An async/await enabled Matrix client.
183///
184/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
185#[derive(Clone)]
186pub struct Client {
187    pub(crate) inner: Arc<ClientInner>,
188}
189
190#[derive(Default)]
191pub(crate) struct ClientLocks {
192    /// Lock ensuring that only a single room may be marked as a DM at once.
193    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
194    /// explanation.
195    pub(crate) mark_as_dm_lock: Mutex<()>,
196
197    /// Lock ensuring that only a single secret store is getting opened at the
198    /// same time.
199    ///
200    /// This is important so we don't accidentally create multiple different new
201    /// default secret storage keys.
202    #[cfg(feature = "e2e-encryption")]
203    pub(crate) open_secret_store_lock: Mutex<()>,
204
205    /// Lock ensuring that we're only storing a single secret at a time.
206    ///
207    /// Take a look at the [`SecretStore::put_secret`] method for a more
208    /// detailed explanation.
209    ///
210    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
211    #[cfg(feature = "e2e-encryption")]
212    pub(crate) store_secret_lock: Mutex<()>,
213
214    /// Lock ensuring that only one method at a time might modify our backup.
215    #[cfg(feature = "e2e-encryption")]
216    pub(crate) backup_modify_lock: Mutex<()>,
217
218    /// Lock ensuring that we're going to attempt to upload backups for a single
219    /// requester.
220    #[cfg(feature = "e2e-encryption")]
221    pub(crate) backup_upload_lock: Mutex<()>,
222
223    /// Handler making sure we only have one group session sharing request in
224    /// flight per room.
225    #[cfg(feature = "e2e-encryption")]
226    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
227
228    /// Lock making sure we're only doing one key claim request at a time.
229    #[cfg(feature = "e2e-encryption")]
230    pub(crate) key_claim_lock: Mutex<()>,
231
232    /// Handler to ensure that only one members request is running at a time,
233    /// given a room.
234    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
235
236    /// Handler to ensure that only one encryption state request is running at a
237    /// time, given a room.
238    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
239
240    /// Deduplicating handler for sending read receipts. The string is an
241    /// internal implementation detail, see [`Self::send_single_receipt`].
242    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
243
244    #[cfg(feature = "e2e-encryption")]
245    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
246
247    /// Latest "generation" of data known by the crypto store.
248    ///
249    /// This is a counter that only increments, set in the database (and can
250    /// wrap). It's incremented whenever some process acquires a lock for the
251    /// first time. *This assumes the crypto store lock is being held, to
252    /// avoid data races on writing to this value in the store*.
253    ///
254    /// The current process will maintain this value in local memory and in the
255    /// DB over time. Observing a different value than the one read in
256    /// memory, when reading from the store indicates that somebody else has
257    /// written into the database under our feet.
258    ///
259    /// TODO: this should live in the `OlmMachine`, since it's information
260    /// related to the lock. As of today (2023-07-28), we blow up the entire
261    /// olm machine when there's a generation mismatch. So storing the
262    /// generation in the olm machine would make the client think there's
263    /// *always* a mismatch, and that's why we need to store the generation
264    /// outside the `OlmMachine`.
265    #[cfg(feature = "e2e-encryption")]
266    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
267}
268
269pub(crate) struct ClientInner {
270    /// All the data related to authentication and authorization.
271    pub(crate) auth_ctx: Arc<AuthCtx>,
272
273    /// The URL of the server.
274    ///
275    /// Not to be confused with the `Self::homeserver`. `server` is usually
276    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
277    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
278    /// homeserver (at the time of writing — 2024-08-28).
279    ///
280    /// This value is optional depending on how the `Client` has been built.
281    /// If it's been built from a homeserver URL directly, we don't know the
282    /// server. However, if the `Client` has been built from a server URL or
283    /// name, then the homeserver has been discovered, and we know both.
284    server: Option<Url>,
285
286    /// The URL of the homeserver to connect to.
287    ///
288    /// This is the URL for the client-server Matrix API.
289    homeserver: StdRwLock<Url>,
290
291    /// The sliding sync version.
292    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
293
294    /// The underlying HTTP client.
295    pub(crate) http_client: HttpClient,
296
297    /// User session data.
298    pub(super) base_client: BaseClient,
299
300    /// Collection of in-memory caches for the [`Client`].
301    pub(crate) caches: ClientCaches,
302
303    /// Collection of locks individual client methods might want to use, either
304    /// to ensure that only a single call to a method happens at once or to
305    /// deduplicate multiple calls to a method.
306    pub(crate) locks: ClientLocks,
307
308    /// The cross-process lock configuration.
309    ///
310    /// The SDK provides cross-process store locks (see
311    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
312    /// [`CrossProcessLockConfig::MultiProcess`] is used.
313    ///
314    /// If multiple `Client`s are running in different processes, this
315    /// value MUST be different for each `Client`.
316    cross_process_lock_config: CrossProcessLockConfig,
317
318    /// A mapping of the times at which the current user sent typing notices,
319    /// keyed by room.
320    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
321
322    /// Event handlers. See `add_event_handler`.
323    pub(crate) event_handlers: EventHandlerStore,
324
325    /// Notification handlers. See `register_notification_handler`.
326    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
327
328    /// The sender-side of channels used to receive room updates.
329    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
330
331    /// The sender-side of a channel used to observe all the room updates of a
332    /// sync response.
333    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
334
335    /// Whether the client should update its homeserver URL with the discovery
336    /// information present in the login response.
337    respect_login_well_known: bool,
338
339    /// An event that can be listened on to wait for a successful sync. The
340    /// event will only be fired if a sync loop is running. Can be used for
341    /// synchronization, e.g. if we send out a request to create a room, we can
342    /// wait for the sync to get the data to fetch a room object from the state
343    /// store.
344    pub(crate) sync_beat: event_listener::Event,
345
346    /// A central cache for events, inactive first.
347    ///
348    /// It becomes active when [`EventCache::subscribe`] is called.
349    pub(crate) event_cache: OnceCell<EventCache>,
350
351    /// End-to-end encryption related state.
352    #[cfg(feature = "e2e-encryption")]
353    pub(crate) e2ee: EncryptionData,
354
355    /// The verification state of our own device.
356    #[cfg(feature = "e2e-encryption")]
357    pub(crate) verification_state: SharedObservable<VerificationState>,
358
359    /// Whether to enable the experimental support for sending and receiving
360    /// encrypted room history on invite, per [MSC4268].
361    ///
362    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
363    #[cfg(feature = "e2e-encryption")]
364    pub(crate) enable_share_history_on_invite: bool,
365
366    /// Data related to the [`SendQueue`].
367    ///
368    /// [`SendQueue`]: crate::send_queue::SendQueue
369    pub(crate) send_queue_data: Arc<SendQueueData>,
370
371    /// The `max_upload_size` value of the homeserver, it contains the max
372    /// request size you can send.
373    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
374
375    /// The entry point to get the [`LatestEvent`] of rooms and threads.
376    ///
377    /// [`LatestEvent`]: crate::latest_event::LatestEvent
378    latest_events: OnceCell<LatestEvents>,
379
380    /// Service handling the catching up of thread subscriptions in the
381    /// background.
382    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
383
384    #[cfg(feature = "experimental-search")]
385    /// Handler for [`RoomIndex`]'s of each room
386    search_index: SearchIndex,
387
388    /// A monitor for background tasks spawned by the client.
389    pub(crate) task_monitor: TaskMonitor,
390
391    /// A sender to notify subscribers about duplicate key upload errors
392    /// triggered by requests to /keys/upload.
393    #[cfg(feature = "e2e-encryption")]
394    pub(crate) duplicate_key_upload_error_sender:
395        broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
396}
397
398impl ClientInner {
399    /// Create a new `ClientInner`.
400    ///
401    /// All the fields passed as parameters here are those that must be cloned
402    /// upon instantiation of a sub-client, e.g. a client specialized for
403    /// notifications.
404    #[allow(clippy::too_many_arguments)]
405    async fn new(
406        auth_ctx: Arc<AuthCtx>,
407        server: Option<Url>,
408        homeserver: Url,
409        sliding_sync_version: SlidingSyncVersion,
410        http_client: HttpClient,
411        base_client: BaseClient,
412        supported_versions: CachedValue<SupportedVersions>,
413        well_known: CachedValue<Option<WellKnownResponse>>,
414        respect_login_well_known: bool,
415        event_cache: OnceCell<EventCache>,
416        send_queue: Arc<SendQueueData>,
417        latest_events: OnceCell<LatestEvents>,
418        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
419        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
420        cross_process_lock_config: CrossProcessLockConfig,
421        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
422        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
423    ) -> Arc<Self> {
424        let caches = ClientCaches {
425            supported_versions: supported_versions.into(),
426            well_known: well_known.into(),
427            server_metadata: Mutex::new(TtlCache::new()),
428        };
429
430        let client = Self {
431            server,
432            homeserver: StdRwLock::new(homeserver),
433            auth_ctx,
434            sliding_sync_version: StdRwLock::new(sliding_sync_version),
435            http_client,
436            base_client,
437            caches,
438            locks: Default::default(),
439            cross_process_lock_config,
440            typing_notice_times: Default::default(),
441            event_handlers: Default::default(),
442            notification_handlers: Default::default(),
443            room_update_channels: Default::default(),
444            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
445            // ballast for all observers to catch up.
446            room_updates_sender: broadcast::Sender::new(32),
447            respect_login_well_known,
448            sync_beat: event_listener::Event::new(),
449            event_cache,
450            send_queue_data: send_queue,
451            latest_events,
452            #[cfg(feature = "e2e-encryption")]
453            e2ee: EncryptionData::new(encryption_settings),
454            #[cfg(feature = "e2e-encryption")]
455            verification_state: SharedObservable::new(VerificationState::Unknown),
456            #[cfg(feature = "e2e-encryption")]
457            enable_share_history_on_invite,
458            server_max_upload_size: Mutex::new(OnceCell::new()),
459            #[cfg(feature = "experimental-search")]
460            search_index: search_index_handler,
461            thread_subscription_catchup,
462            task_monitor: TaskMonitor::new(),
463            #[cfg(feature = "e2e-encryption")]
464            duplicate_key_upload_error_sender: broadcast::channel(1).0,
465        };
466
467        #[allow(clippy::let_and_return)]
468        let client = Arc::new(client);
469
470        #[cfg(feature = "e2e-encryption")]
471        client.e2ee.initialize_tasks(&client);
472
473        let _ = client
474            .event_cache
475            .get_or_init(|| async {
476                EventCache::new(&client, client.base_client.event_cache_store().clone())
477            })
478            .await;
479
480        let _ = client
481            .thread_subscription_catchup
482            .get_or_init(|| async {
483                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
484            })
485            .await;
486
487        client
488    }
489}
490
491#[cfg(not(tarpaulin_include))]
492impl Debug for Client {
493    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
494        write!(fmt, "Client")
495    }
496}
497
498impl Client {
499    /// Create a new [`Client`] that will use the given homeserver.
500    ///
501    /// # Arguments
502    ///
503    /// * `homeserver_url` - The homeserver that the client should connect to.
504    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
505        Self::builder().homeserver_url(homeserver_url).build().await
506    }
507
508    /// Returns a subscriber that publishes an event every time the ignore user
509    /// list changes.
510    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
511        self.inner.base_client.subscribe_to_ignore_user_list_changes()
512    }
513
514    /// Create a new [`ClientBuilder`].
515    pub fn builder() -> ClientBuilder {
516        ClientBuilder::new()
517    }
518
519    pub(crate) fn base_client(&self) -> &BaseClient {
520        &self.inner.base_client
521    }
522
523    /// The underlying HTTP client.
524    pub fn http_client(&self) -> &reqwest::Client {
525        &self.inner.http_client.inner
526    }
527
528    pub(crate) fn locks(&self) -> &ClientLocks {
529        &self.inner.locks
530    }
531
532    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
533        &self.inner.auth_ctx
534    }
535
536    /// The cross-process store lock configuration used by this [`Client`].
537    ///
538    /// The SDK provides cross-process store locks (see
539    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
540    /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
541    /// the value used for all cross-process store locks used by this
542    /// `Client`.
543    pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
544        &self.inner.cross_process_lock_config
545    }
546
547    /// Change the homeserver URL used by this client.
548    ///
549    /// # Arguments
550    ///
551    /// * `homeserver_url` - The new URL to use.
552    fn set_homeserver(&self, homeserver_url: Url) {
553        *self.inner.homeserver.write().unwrap() = homeserver_url;
554    }
555
556    /// Change to a different homeserver and re-resolve well-known.
557    #[cfg(feature = "e2e-encryption")]
558    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
559        &self,
560        homeserver_url: Url,
561    ) -> Result<()> {
562        self.set_homeserver(homeserver_url);
563        self.reset_well_known().await?;
564        if let Some(well_known) = self.load_or_fetch_well_known().await? {
565            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
566        }
567        Ok(())
568    }
569
570    /// Get the capabilities of the homeserver.
571    ///
572    /// This method should be used to check what features are supported by the
573    /// homeserver.
574    ///
575    /// # Examples
576    ///
577    /// ```no_run
578    /// # use matrix_sdk::Client;
579    /// # use url::Url;
580    /// # async {
581    /// # let homeserver = Url::parse("http://example.com")?;
582    /// let client = Client::new(homeserver).await?;
583    ///
584    /// let capabilities = client.get_capabilities().await?;
585    ///
586    /// if capabilities.change_password.enabled {
587    ///     // Change password
588    /// }
589    /// # anyhow::Ok(()) };
590    /// ```
591    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
592        let res = self.send(get_capabilities::v3::Request::new()).await?;
593        Ok(res.capabilities)
594    }
595
596    /// Get the server vendor information from the federation API.
597    ///
598    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
599    /// both the server's software name and version.
600    ///
601    /// # Examples
602    ///
603    /// ```no_run
604    /// # use matrix_sdk::Client;
605    /// # use url::Url;
606    /// # async {
607    /// # let homeserver = Url::parse("http://example.com")?;
608    /// let client = Client::new(homeserver).await?;
609    ///
610    /// let server_info = client.server_vendor_info(None).await?;
611    /// println!(
612    ///     "Server: {}, Version: {}",
613    ///     server_info.server_name, server_info.version
614    /// );
615    /// # anyhow::Ok(()) };
616    /// ```
617    #[cfg(feature = "federation-api")]
618    pub async fn server_vendor_info(
619        &self,
620        request_config: Option<RequestConfig>,
621    ) -> HttpResult<ServerVendorInfo> {
622        use ruma::api::federation::discovery::get_server_version;
623
624        let res = self
625            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
626            .await?;
627
628        // Extract server info, using defaults if fields are missing.
629        let server = res.server.unwrap_or_default();
630        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
631        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
632
633        Ok(ServerVendorInfo { server_name: server_name_str, version })
634    }
635
636    /// Get a copy of the default request config.
637    ///
638    /// The default request config is what's used when sending requests if no
639    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
640    /// function with such a parameter.
641    ///
642    /// If the default request config was not customized through
643    /// [`ClientBuilder`] when creating this `Client`, the returned value will
644    /// be equivalent to [`RequestConfig::default()`].
645    pub fn request_config(&self) -> RequestConfig {
646        self.inner.http_client.request_config
647    }
648
649    /// Check whether the client has been activated.
650    ///
651    /// A client is considered active when:
652    ///
653    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
654    ///    is logged in,
655    /// 2. Has loaded cached data from storage,
656    /// 3. If encryption is enabled, it also initialized or restored its
657    ///    `OlmMachine`.
658    pub fn is_active(&self) -> bool {
659        self.inner.base_client.is_active()
660    }
661
662    /// The server used by the client.
663    ///
664    /// See `Self::server` to learn more.
665    pub fn server(&self) -> Option<&Url> {
666        self.inner.server.as_ref()
667    }
668
669    /// The homeserver of the client.
670    pub fn homeserver(&self) -> Url {
671        self.inner.homeserver.read().unwrap().clone()
672    }
673
674    /// Get the sliding sync version.
675    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
676        self.inner.sliding_sync_version.read().unwrap().clone()
677    }
678
679    /// Override the sliding sync version.
680    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
681        let mut lock = self.inner.sliding_sync_version.write().unwrap();
682        *lock = version;
683    }
684
685    /// Get the Matrix user session meta information.
686    ///
687    /// If the client is currently logged in, this will return a
688    /// [`SessionMeta`] object which contains the user ID and device ID.
689    /// Otherwise it returns `None`.
690    pub fn session_meta(&self) -> Option<&SessionMeta> {
691        self.base_client().session_meta()
692    }
693
694    /// Returns a receiver that gets events for each room info update. To watch
695    /// for new events, use `receiver.resubscribe()`.
696    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
697        self.base_client().room_info_notable_update_receiver()
698    }
699
700    /// Performs a search for users.
701    /// The search is performed case-insensitively on user IDs and display names
702    ///
703    /// # Arguments
704    ///
705    /// * `search_term` - The search term for the search
706    /// * `limit` - The maximum number of results to return. Defaults to 10.
707    ///
708    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
709    pub async fn search_users(
710        &self,
711        search_term: &str,
712        limit: u64,
713    ) -> HttpResult<search_users::v3::Response> {
714        let mut request = search_users::v3::Request::new(search_term.to_owned());
715
716        if let Some(limit) = UInt::new(limit) {
717            request.limit = limit;
718        }
719
720        self.send(request).await
721    }
722
723    /// Get the user id of the current owner of the client.
724    pub fn user_id(&self) -> Option<&UserId> {
725        self.session_meta().map(|s| s.user_id.as_ref())
726    }
727
728    /// Get the device ID that identifies the current session.
729    pub fn device_id(&self) -> Option<&DeviceId> {
730        self.session_meta().map(|s| s.device_id.as_ref())
731    }
732
733    /// Get the current access token for this session.
734    ///
735    /// Will be `None` if the client has not been logged in.
736    pub fn access_token(&self) -> Option<String> {
737        self.auth_ctx().access_token()
738    }
739
740    /// Get the current tokens for this session.
741    ///
742    /// To be notified of changes in the session tokens, use
743    /// [`Client::subscribe_to_session_changes()`] or
744    /// [`Client::set_session_callbacks()`].
745    ///
746    /// Returns `None` if the client has not been logged in.
747    pub fn session_tokens(&self) -> Option<SessionTokens> {
748        self.auth_ctx().session_tokens()
749    }
750
751    /// Access the authentication API used to log in this client.
752    ///
753    /// Will be `None` if the client has not been logged in.
754    pub fn auth_api(&self) -> Option<AuthApi> {
755        match self.auth_ctx().auth_data.get()? {
756            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
757            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
758        }
759    }
760
761    /// Get the whole session info of this client.
762    ///
763    /// Will be `None` if the client has not been logged in.
764    ///
765    /// Can be used with [`Client::restore_session`] to restore a previously
766    /// logged-in session.
767    pub fn session(&self) -> Option<AuthSession> {
768        match self.auth_api()? {
769            AuthApi::Matrix(api) => api.session().map(Into::into),
770            AuthApi::OAuth(api) => api.full_session().map(Into::into),
771        }
772    }
773
774    /// Get a reference to the state store.
775    pub fn state_store(&self) -> &DynStateStore {
776        self.base_client().state_store()
777    }
778
779    /// Get a reference to the event cache store.
780    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
781        self.base_client().event_cache_store()
782    }
783
784    /// Get a reference to the media store.
785    pub fn media_store(&self) -> &MediaStoreLock {
786        self.base_client().media_store()
787    }
788
789    /// Access the native Matrix authentication API with this client.
790    pub fn matrix_auth(&self) -> MatrixAuth {
791        MatrixAuth::new(self.clone())
792    }
793
794    /// Get the account of the current owner of the client.
795    pub fn account(&self) -> Account {
796        Account::new(self.clone())
797    }
798
799    /// Get the encryption manager of the client.
800    #[cfg(feature = "e2e-encryption")]
801    pub fn encryption(&self) -> Encryption {
802        Encryption::new(self.clone())
803    }
804
805    /// Get the media manager of the client.
806    pub fn media(&self) -> Media {
807        Media::new(self.clone())
808    }
809
810    /// Get the pusher manager of the client.
811    pub fn pusher(&self) -> Pusher {
812        Pusher::new(self.clone())
813    }
814
815    /// Access the OAuth 2.0 API of the client.
816    pub fn oauth(&self) -> OAuth {
817        OAuth::new(self.clone())
818    }
819
820    /// Register a handler for a specific event type.
821    ///
822    /// The handler is a function or closure with one or more arguments. The
823    /// first argument is the event itself. All additional arguments are
824    /// "context" arguments: They have to implement [`EventHandlerContext`].
825    /// This trait is named that way because most of the types implementing it
826    /// give additional context about an event: The room it was in, its raw form
827    /// and other similar things. As two exceptions to this,
828    /// [`Client`] and [`EventHandlerHandle`] also implement the
829    /// `EventHandlerContext` trait so you don't have to clone your client
830    /// into the event handler manually and a handler can decide to remove
831    /// itself.
832    ///
833    /// Some context arguments are not universally applicable. A context
834    /// argument that isn't available for the given event type will result in
835    /// the event handler being skipped and an error being logged. The following
836    /// context argument types are only available for a subset of event types:
837    ///
838    /// * [`Room`] is only available for room-specific events, i.e. not for
839    ///   events like global account data events or presence events.
840    ///
841    /// You can provide custom context via
842    /// [`add_event_handler_context`](Client::add_event_handler_context) and
843    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
844    /// into the event handler.
845    ///
846    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
847    ///
848    /// # Examples
849    ///
850    /// ```no_run
851    /// use matrix_sdk::{
852    ///     deserialized_responses::EncryptionInfo,
853    ///     event_handler::Ctx,
854    ///     ruma::{
855    ///         events::{
856    ///             macros::EventContent,
857    ///             push_rules::PushRulesEvent,
858    ///             room::{
859    ///                 message::SyncRoomMessageEvent,
860    ///                 topic::SyncRoomTopicEvent,
861    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
862    ///             },
863    ///         },
864    ///         push::Action,
865    ///         Int, MilliSecondsSinceUnixEpoch,
866    ///     },
867    ///     Client, Room,
868    /// };
869    /// use serde::{Deserialize, Serialize};
870    ///
871    /// # async fn example(client: Client) {
872    /// client.add_event_handler(
873    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
874    ///         // Common usage: Room event plus room and client.
875    ///     },
876    /// );
877    /// client.add_event_handler(
878    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
879    ///         async move {
880    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
881    ///             // unencrypted events and events that were decrypted by the SDK.
882    ///         }
883    ///     },
884    /// );
885    /// client.add_event_handler(
886    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
887    ///         async move {
888    ///             // A `Vec<Action>` parameter allows you to know which push actions
889    ///             // are applicable for an event. For example, an event with
890    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
891    ///             // in the timeline.
892    ///         }
893    ///     },
894    /// );
895    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
896    ///     // You can omit any or all arguments after the first.
897    /// });
898    ///
899    /// // Registering a temporary event handler:
900    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
901    ///     /* Event handler */
902    /// });
903    /// client.remove_event_handler(handle);
904    ///
905    /// // Registering custom event handler context:
906    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
907    /// struct MyContext {
908    ///     number: usize,
909    /// }
910    /// client.add_event_handler_context(MyContext { number: 5 });
911    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
912    ///     // Use the context
913    /// });
914    ///
915    /// // This will handle membership events in joined rooms. Invites are special, see below.
916    /// client.add_event_handler(
917    ///     |ev: SyncRoomMemberEvent| async move {},
918    /// );
919    ///
920    /// // To handle state events in invited rooms (including invite membership events),
921    /// // `StrippedRoomMemberEvent` should be used.
922    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
923    /// client.add_event_handler(
924    ///     |ev: StrippedRoomMemberEvent| async move {},
925    /// );
926    ///
927    /// // Custom events work exactly the same way, you just need to declare
928    /// // the content struct and use the EventContent derive macro on it.
929    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
930    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
931    /// struct TokenEventContent {
932    ///     token: String,
933    ///     #[serde(rename = "exp")]
934    ///     expires_at: MilliSecondsSinceUnixEpoch,
935    /// }
936    ///
937    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
938    ///     todo!("Display the token");
939    /// });
940    ///
941    /// // Event handler closures can also capture local variables.
942    /// // Make sure they are cheap to clone though, because they will be cloned
943    /// // every time the closure is called.
944    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
945    ///
946    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
947    ///     println!("Calling the handler with identifier {data}");
948    /// });
949    /// # }
950    /// ```
951    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
952    where
953        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
954        H: EventHandler<Ev, Ctx>,
955    {
956        self.add_event_handler_impl(handler, None)
957    }
958
959    /// Register a handler for a specific room, and event type.
960    ///
961    /// This method works the same way as
962    /// [`add_event_handler`][Self::add_event_handler], except that the handler
963    /// will only be called for events in the room with the specified ID. See
964    /// that method for more details on event handler functions.
965    ///
966    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
967    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
968    /// your use case.
969    pub fn add_room_event_handler<Ev, Ctx, H>(
970        &self,
971        room_id: &RoomId,
972        handler: H,
973    ) -> EventHandlerHandle
974    where
975        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
976        H: EventHandler<Ev, Ctx>,
977    {
978        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
979    }
980
981    /// Observe a specific event type.
982    ///
983    /// `Ev` represents the kind of event that will be observed. `Ctx`
984    /// represents the context that will come with the event. It relies on the
985    /// same mechanism as [`Client::add_event_handler`]. The main difference is
986    /// that it returns an [`ObservableEventHandler`] and doesn't require a
987    /// user-defined closure. It is possible to subscribe to the
988    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
989    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
990    /// Ctx)`.
991    ///
992    /// Be careful that only the most recent value can be observed. Subscribers
993    /// are notified when a new value is sent, but there is no guarantee
994    /// that they will see all values.
995    ///
996    /// # Example
997    ///
998    /// Let's see a classical usage:
999    ///
1000    /// ```
1001    /// use futures_util::StreamExt as _;
1002    /// use matrix_sdk::{
1003    ///     Client, Room,
1004    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
1005    /// };
1006    ///
1007    /// # async fn example(client: Client) -> Option<()> {
1008    /// let observer =
1009    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1010    ///
1011    /// let mut subscriber = observer.subscribe();
1012    ///
1013    /// let (event, (room, push_actions)) = subscriber.next().await?;
1014    /// # Some(())
1015    /// # }
1016    /// ```
1017    ///
1018    /// Now let's see how to get several contexts that can be useful for you:
1019    ///
1020    /// ```
1021    /// use matrix_sdk::{
1022    ///     Client, Room,
1023    ///     deserialized_responses::EncryptionInfo,
1024    ///     ruma::{
1025    ///         events::room::{
1026    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1027    ///         },
1028    ///         push::Action,
1029    ///     },
1030    /// };
1031    ///
1032    /// # async fn example(client: Client) {
1033    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1034    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1035    ///
1036    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1037    /// // to distinguish between unencrypted events and events that were decrypted
1038    /// // by the SDK.
1039    /// let _ = client
1040    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1041    ///     );
1042    ///
1043    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1044    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1045    /// // should be highlighted in the timeline.
1046    /// let _ =
1047    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1048    ///
1049    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1050    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1051    /// # }
1052    /// ```
1053    ///
1054    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1055    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1056    where
1057        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1058        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1059    {
1060        self.observe_room_events_impl(None)
1061    }
1062
1063    /// Observe a specific room, and event type.
1064    ///
1065    /// This method works the same way as [`Client::observe_events`], except
1066    /// that the observability will only be applied for events in the room with
1067    /// the specified ID. See that method for more details.
1068    ///
1069    /// Be careful that only the most recent value can be observed. Subscribers
1070    /// are notified when a new value is sent, but there is no guarantee
1071    /// that they will see all values.
1072    pub fn observe_room_events<Ev, Ctx>(
1073        &self,
1074        room_id: &RoomId,
1075    ) -> ObservableEventHandler<(Ev, Ctx)>
1076    where
1077        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1078        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1079    {
1080        self.observe_room_events_impl(Some(room_id.to_owned()))
1081    }
1082
1083    /// Shared implementation for `Client::observe_events` and
1084    /// `Client::observe_room_events`.
1085    fn observe_room_events_impl<Ev, Ctx>(
1086        &self,
1087        room_id: Option<OwnedRoomId>,
1088    ) -> ObservableEventHandler<(Ev, Ctx)>
1089    where
1090        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1091        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1092    {
1093        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1094        // new value.
1095        let shared_observable = SharedObservable::new(None);
1096
1097        ObservableEventHandler::new(
1098            shared_observable.clone(),
1099            self.event_handler_drop_guard(self.add_event_handler_impl(
1100                move |event: Ev, context: Ctx| {
1101                    shared_observable.set(Some((event, context)));
1102
1103                    ready(())
1104                },
1105                room_id,
1106            )),
1107        )
1108    }
1109
1110    /// Remove the event handler associated with the handle.
1111    ///
1112    /// Note that you **must not** call `remove_event_handler` from the
1113    /// non-async part of an event handler, that is:
1114    ///
1115    /// ```ignore
1116    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1117    ///     // ⚠ this will cause a deadlock ⚠
1118    ///     client.remove_event_handler(handle);
1119    ///
1120    ///     async move {
1121    ///         // removing the event handler here is fine
1122    ///         client.remove_event_handler(handle);
1123    ///     }
1124    /// })
1125    /// ```
1126    ///
1127    /// Note also that handlers that remove themselves will still execute with
1128    /// events received in the same sync cycle.
1129    ///
1130    /// # Arguments
1131    ///
1132    /// `handle` - The [`EventHandlerHandle`] that is returned when
1133    /// registering the event handler with [`Client::add_event_handler`].
1134    ///
1135    /// # Examples
1136    ///
1137    /// ```no_run
1138    /// # use url::Url;
1139    /// # use tokio::sync::mpsc;
1140    /// #
1141    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1142    /// #
1143    /// use matrix_sdk::{
1144    ///     Client, event_handler::EventHandlerHandle,
1145    ///     ruma::events::room::member::SyncRoomMemberEvent,
1146    /// };
1147    /// #
1148    /// # futures_executor::block_on(async {
1149    /// # let client = matrix_sdk::Client::builder()
1150    /// #     .homeserver_url(homeserver)
1151    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1152    /// #     .build()
1153    /// #     .await
1154    /// #     .unwrap();
1155    ///
1156    /// client.add_event_handler(
1157    ///     |ev: SyncRoomMemberEvent,
1158    ///      client: Client,
1159    ///      handle: EventHandlerHandle| async move {
1160    ///         // Common usage: Check arriving Event is the expected one
1161    ///         println!("Expected RoomMemberEvent received!");
1162    ///         client.remove_event_handler(handle);
1163    ///     },
1164    /// );
1165    /// # });
1166    /// ```
1167    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1168        self.inner.event_handlers.remove(handle);
1169    }
1170
1171    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1172    /// the given handle.
1173    ///
1174    /// When the returned value is dropped, the event handler will be removed.
1175    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1176        EventHandlerDropGuard::new(handle, self.clone())
1177    }
1178
1179    /// Add an arbitrary value for use as event handler context.
1180    ///
1181    /// The value can be obtained in an event handler by adding an argument of
1182    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1183    ///
1184    /// If a value of the same type has been added before, it will be
1185    /// overwritten.
1186    ///
1187    /// # Examples
1188    ///
1189    /// ```no_run
1190    /// use matrix_sdk::{
1191    ///     Room, event_handler::Ctx,
1192    ///     ruma::events::room::message::SyncRoomMessageEvent,
1193    /// };
1194    /// # #[derive(Clone)]
1195    /// # struct SomeType;
1196    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1197    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1198    /// # futures_executor::block_on(async {
1199    /// # let client = matrix_sdk::Client::builder()
1200    /// #     .homeserver_url(homeserver)
1201    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1202    /// #     .build()
1203    /// #     .await
1204    /// #     .unwrap();
1205    ///
1206    /// // Handle used to send messages to the UI part of the app
1207    /// let my_gui_handle: SomeType = obtain_gui_handle();
1208    ///
1209    /// client.add_event_handler_context(my_gui_handle.clone());
1210    /// client.add_event_handler(
1211    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1212    ///         async move {
1213    ///             // gui_handle.send(DisplayMessage { message: ev });
1214    ///         }
1215    ///     },
1216    /// );
1217    /// # });
1218    /// ```
1219    pub fn add_event_handler_context<T>(&self, ctx: T)
1220    where
1221        T: Clone + Send + Sync + 'static,
1222    {
1223        self.inner.event_handlers.add_context(ctx);
1224    }
1225
1226    /// Register a handler for a notification.
1227    ///
1228    /// Similar to [`Client::add_event_handler`], but only allows functions
1229    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1230    /// [`Client`] for now.
1231    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1232    where
1233        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1234        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1235    {
1236        self.inner.notification_handlers.write().await.push(Box::new(
1237            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1238        ));
1239
1240        self
1241    }
1242
1243    /// Subscribe to all updates for the room with the given ID.
1244    ///
1245    /// The returned receiver will receive a new message for each sync response
1246    /// that contains updates for that room.
1247    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1248        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1249            btree_map::Entry::Vacant(entry) => {
1250                let (tx, rx) = broadcast::channel(8);
1251                entry.insert(tx);
1252                rx
1253            }
1254            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1255        }
1256    }
1257
1258    /// Subscribe to all updates to all rooms, whenever any has been received in
1259    /// a sync response.
1260    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1261        self.inner.room_updates_sender.subscribe()
1262    }
1263
1264    pub(crate) async fn notification_handlers(
1265        &self,
1266    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1267        self.inner.notification_handlers.read().await
1268    }
1269
1270    /// Get all the rooms the client knows about.
1271    ///
1272    /// This will return the list of joined, invited, and left rooms.
1273    pub fn rooms(&self) -> Vec<Room> {
1274        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1275    }
1276
1277    /// Get all the rooms the client knows about, filtered by room state.
1278    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1279        self.base_client()
1280            .rooms_filtered(filter)
1281            .into_iter()
1282            .map(|room| Room::new(self.clone(), room))
1283            .collect()
1284    }
1285
1286    /// Get a stream of all the rooms, in addition to the existing rooms.
1287    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1288        let (rooms, stream) = self.base_client().rooms_stream();
1289
1290        let map_room = |room| Room::new(self.clone(), room);
1291
1292        (
1293            rooms.into_iter().map(map_room).collect(),
1294            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1295        )
1296    }
1297
1298    /// Returns the joined rooms this client knows about.
1299    pub fn joined_rooms(&self) -> Vec<Room> {
1300        self.rooms_filtered(RoomStateFilter::JOINED)
1301    }
1302
1303    /// Returns the invited rooms this client knows about.
1304    pub fn invited_rooms(&self) -> Vec<Room> {
1305        self.rooms_filtered(RoomStateFilter::INVITED)
1306    }
1307
1308    /// Returns the left rooms this client knows about.
1309    pub fn left_rooms(&self) -> Vec<Room> {
1310        self.rooms_filtered(RoomStateFilter::LEFT)
1311    }
1312
1313    /// Returns the joined space rooms this client knows about.
1314    pub fn joined_space_rooms(&self) -> Vec<Room> {
1315        self.base_client()
1316            .rooms_filtered(RoomStateFilter::JOINED)
1317            .into_iter()
1318            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1319            .collect()
1320    }
1321
1322    /// Get a room with the given room id.
1323    ///
1324    /// # Arguments
1325    ///
1326    /// `room_id` - The unique id of the room that should be fetched.
1327    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1328        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1329    }
1330
1331    /// Gets the preview of a room, whether the current user has joined it or
1332    /// not.
1333    pub async fn get_room_preview(
1334        &self,
1335        room_or_alias_id: &RoomOrAliasId,
1336        via: Vec<OwnedServerName>,
1337    ) -> Result<RoomPreview> {
1338        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1339            Ok(room_id) => room_id.to_owned(),
1340            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1341        };
1342
1343        if let Some(room) = self.get_room(&room_id) {
1344            // The cached data can only be trusted if the room state is joined or
1345            // banned: for invite and knock rooms, no updates will be received
1346            // for the rooms after the invite/knock action took place so we may
1347            // have very out to date data for important fields such as
1348            // `join_rule`. For left rooms, the homeserver should return the latest info.
1349            match room.state() {
1350                RoomState::Joined | RoomState::Banned => {
1351                    return Ok(RoomPreview::from_known_room(&room).await);
1352                }
1353                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1354            }
1355        }
1356
1357        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1358    }
1359
1360    /// Resolve a room alias to a room id and a list of servers which know
1361    /// about it.
1362    ///
1363    /// # Arguments
1364    ///
1365    /// `room_alias` - The room alias to be resolved.
1366    pub async fn resolve_room_alias(
1367        &self,
1368        room_alias: &RoomAliasId,
1369    ) -> HttpResult<get_alias::v3::Response> {
1370        let request = get_alias::v3::Request::new(room_alias.to_owned());
1371        self.send(request).await
1372    }
1373
1374    /// Checks if a room alias is not in use yet.
1375    ///
1376    /// Returns:
1377    /// - `Ok(true)` if the room alias is available.
1378    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1379    ///   status code).
1380    /// - An `Err` otherwise.
1381    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1382        match self.resolve_room_alias(alias).await {
1383            // The room alias was resolved, so it's already in use.
1384            Ok(_) => Ok(false),
1385            Err(error) => {
1386                match error.client_api_error_kind() {
1387                    // The room alias wasn't found, so it's available.
1388                    Some(ErrorKind::NotFound) => Ok(true),
1389                    _ => Err(error),
1390                }
1391            }
1392        }
1393    }
1394
1395    /// Adds a new room alias associated with a room to the room directory.
1396    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1397        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1398        self.send(request).await?;
1399        Ok(())
1400    }
1401
1402    /// Removes a room alias from the room directory.
1403    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1404        let request = delete_alias::v3::Request::new(alias.to_owned());
1405        self.send(request).await?;
1406        Ok(())
1407    }
1408
1409    /// Update the homeserver from the login response well-known if needed.
1410    ///
1411    /// # Arguments
1412    ///
1413    /// * `login_well_known` - The `well_known` field from a successful login
1414    ///   response.
1415    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1416        if self.inner.respect_login_well_known
1417            && let Some(well_known) = login_well_known
1418            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1419        {
1420            self.set_homeserver(homeserver);
1421        }
1422    }
1423
1424    /// Similar to [`Client::restore_session_with`], with
1425    /// [`RoomLoadSettings::default()`].
1426    ///
1427    /// # Panics
1428    ///
1429    /// Panics if a session was already restored or logged in.
1430    #[instrument(skip_all)]
1431    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1432        self.restore_session_with(session, RoomLoadSettings::default()).await
1433    }
1434
1435    /// Restore a session previously logged-in using one of the available
1436    /// authentication APIs. The number of rooms to restore is controlled by
1437    /// [`RoomLoadSettings`].
1438    ///
1439    /// See the documentation of the corresponding authentication API's
1440    /// `restore_session` method for more information.
1441    ///
1442    /// # Panics
1443    ///
1444    /// Panics if a session was already restored or logged in.
1445    #[instrument(skip_all)]
1446    pub async fn restore_session_with(
1447        &self,
1448        session: impl Into<AuthSession>,
1449        room_load_settings: RoomLoadSettings,
1450    ) -> Result<()> {
1451        let session = session.into();
1452        match session {
1453            AuthSession::Matrix(session) => {
1454                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1455            }
1456            AuthSession::OAuth(session) => {
1457                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1458            }
1459        }
1460    }
1461
1462    /// Refresh the access token using the authentication API used to log into
1463    /// this session.
1464    ///
1465    /// See the documentation of the authentication API's `refresh_access_token`
1466    /// method for more information.
1467    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1468        let Some(auth_api) = self.auth_api() else {
1469            return Err(RefreshTokenError::RefreshTokenRequired);
1470        };
1471
1472        match auth_api {
1473            AuthApi::Matrix(api) => {
1474                trace!("Token refresh: Using the homeserver.");
1475                Box::pin(api.refresh_access_token()).await?;
1476            }
1477            AuthApi::OAuth(api) => {
1478                trace!("Token refresh: Using OAuth 2.0.");
1479                Box::pin(api.refresh_access_token()).await?;
1480            }
1481        }
1482
1483        Ok(())
1484    }
1485
1486    /// Log out the current session using the proper authentication API.
1487    ///
1488    /// # Errors
1489    ///
1490    /// Returns an error if the session is not authenticated or if an error
1491    /// occurred while making the request to the server.
1492    pub async fn logout(&self) -> Result<(), Error> {
1493        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1494        match auth_api {
1495            AuthApi::Matrix(matrix_auth) => {
1496                matrix_auth.logout().await?;
1497                Ok(())
1498            }
1499            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1500        }
1501    }
1502
1503    /// Get or upload a sync filter.
1504    ///
1505    /// This method will either get a filter ID from the store or upload the
1506    /// filter definition to the homeserver and return the new filter ID.
1507    ///
1508    /// # Arguments
1509    ///
1510    /// * `filter_name` - The unique name of the filter, this name will be used
1511    /// locally to store and identify the filter ID returned by the server.
1512    ///
1513    /// * `definition` - The filter definition that should be uploaded to the
1514    /// server if no filter ID can be found in the store.
1515    ///
1516    /// # Examples
1517    ///
1518    /// ```no_run
1519    /// # use matrix_sdk::{
1520    /// #    Client, config::SyncSettings,
1521    /// #    ruma::api::client::{
1522    /// #        filter::{
1523    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1524    /// #        },
1525    /// #        sync::sync_events::v3::Filter,
1526    /// #    }
1527    /// # };
1528    /// # use url::Url;
1529    /// # async {
1530    /// # let homeserver = Url::parse("http://example.com").unwrap();
1531    /// # let client = Client::new(homeserver).await.unwrap();
1532    /// let mut filter = FilterDefinition::default();
1533    ///
1534    /// // Let's enable member lazy loading.
1535    /// filter.room.state.lazy_load_options =
1536    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1537    ///
1538    /// let filter_id = client
1539    ///     .get_or_upload_filter("sync", filter)
1540    ///     .await
1541    ///     .unwrap();
1542    ///
1543    /// let sync_settings = SyncSettings::new()
1544    ///     .filter(Filter::FilterId(filter_id));
1545    ///
1546    /// let response = client.sync_once(sync_settings).await.unwrap();
1547    /// # };
1548    #[instrument(skip(self, definition))]
1549    pub async fn get_or_upload_filter(
1550        &self,
1551        filter_name: &str,
1552        definition: FilterDefinition,
1553    ) -> Result<String> {
1554        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1555            debug!("Found filter locally");
1556            Ok(filter)
1557        } else {
1558            debug!("Didn't find filter locally");
1559            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1560            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1561            let response = self.send(request).await?;
1562
1563            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1564
1565            Ok(response.filter_id)
1566        }
1567    }
1568
1569    /// Prepare to join a room by ID, by getting the current details about it
1570    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1571        let room = self.get_room(room_id)?;
1572
1573        let inviter = match room.invite_details().await {
1574            Ok(details) => details.inviter,
1575            Err(Error::WrongRoomState(_)) => None,
1576            Err(e) => {
1577                warn!("Error fetching invite details for room: {e:?}");
1578                None
1579            }
1580        };
1581
1582        Some(PreJoinRoomInfo { inviter })
1583    }
1584
1585    /// Finish joining a room.
1586    ///
1587    /// If the room was an invite that should be marked as a DM, will include it
1588    /// in the DM event after creating the joined room.
1589    ///
1590    /// If encrypted history sharing is enabled, will check to see if we have a
1591    /// key bundle, and import it if so.
1592    ///
1593    /// # Arguments
1594    ///
1595    /// * `room_id` - The `RoomId` of the room that was joined.
1596    /// * `pre_join_room_info` - Information about the room before we joined.
1597    async fn finish_join_room(
1598        &self,
1599        room_id: &RoomId,
1600        pre_join_room_info: Option<PreJoinRoomInfo>,
1601    ) -> Result<Room> {
1602        info!(?room_id, ?pre_join_room_info, "Completing room join");
1603        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1604            room.state() == RoomState::Invited
1605                && room.is_direct().await.unwrap_or_else(|e| {
1606                    warn!(%room_id, "is_direct() failed: {e}");
1607                    false
1608                })
1609        } else {
1610            false
1611        };
1612
1613        let base_room = self
1614            .base_client()
1615            .room_joined(
1616                room_id,
1617                pre_join_room_info
1618                    .as_ref()
1619                    .and_then(|info| info.inviter.as_ref())
1620                    .map(|i| i.user_id().to_owned()),
1621            )
1622            .await?;
1623        let room = Room::new(self.clone(), base_room);
1624
1625        if mark_as_dm {
1626            room.set_is_direct(true).await?;
1627        }
1628
1629        // If we joined following an invite, check if we had previously received a key
1630        // bundle from the inviter, and import it if so.
1631        //
1632        // It's important that we only do this once `BaseClient::room_joined` has
1633        // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1634        // race.
1635        #[cfg(feature = "e2e-encryption")]
1636        if self.inner.enable_share_history_on_invite
1637            && let Some(inviter) =
1638                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1639        {
1640            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1641                .await?;
1642        }
1643
1644        // Suppress "unused variable" and "unused field" lints
1645        #[cfg(not(feature = "e2e-encryption"))]
1646        let _ = pre_join_room_info.map(|i| i.inviter);
1647
1648        Ok(room)
1649    }
1650
1651    /// Join a room by `RoomId`.
1652    ///
1653    /// Returns the `Room` in the joined state.
1654    ///
1655    /// # Arguments
1656    ///
1657    /// * `room_id` - The `RoomId` of the room to be joined.
1658    #[instrument(skip(self))]
1659    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1660        // See who invited us to this room, if anyone. Note we have to do this before
1661        // making the `/join` request, otherwise we could race against the sync.
1662        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1663
1664        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1665        let response = self.send(request).await?;
1666        self.finish_join_room(&response.room_id, pre_join_info).await
1667    }
1668
1669    /// Join a room by `RoomOrAliasId`.
1670    ///
1671    /// Returns the `Room` in the joined state.
1672    ///
1673    /// # Arguments
1674    ///
1675    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1676    ///   alias looks like `#name:example.com`.
1677    /// * `server_names` - The server names to be used for resolving the alias,
1678    ///   if needs be.
1679    #[instrument(skip(self))]
1680    pub async fn join_room_by_id_or_alias(
1681        &self,
1682        alias: &RoomOrAliasId,
1683        server_names: &[OwnedServerName],
1684    ) -> Result<Room> {
1685        let pre_join_info = {
1686            match alias.try_into() {
1687                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1688                Err(_) => {
1689                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1690                    // responding to an invitation to the room, and therefore don't need to handle
1691                    // things that happen as a result of invites.
1692                    None
1693                }
1694            }
1695        };
1696        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1697            via: server_names.to_owned(),
1698        });
1699        let response = self.send(request).await?;
1700        self.finish_join_room(&response.room_id, pre_join_info).await
1701    }
1702
1703    /// Search the homeserver's directory of public rooms.
1704    ///
1705    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1706    /// a `get_public_rooms::Response`.
1707    ///
1708    /// # Arguments
1709    ///
1710    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1711    ///
1712    /// * `since` - Pagination token from a previous request.
1713    ///
1714    /// * `server` - The name of the server, if `None` the requested server is
1715    ///   used.
1716    ///
1717    /// # Examples
1718    /// ```no_run
1719    /// use matrix_sdk::Client;
1720    /// # use url::Url;
1721    /// # let homeserver = Url::parse("http://example.com").unwrap();
1722    /// # let limit = Some(10);
1723    /// # let since = Some("since token");
1724    /// # let server = Some("servername.com".try_into().unwrap());
1725    /// # async {
1726    /// let mut client = Client::new(homeserver).await.unwrap();
1727    ///
1728    /// client.public_rooms(limit, since, server).await;
1729    /// # };
1730    /// ```
1731    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1732    pub async fn public_rooms(
1733        &self,
1734        limit: Option<u32>,
1735        since: Option<&str>,
1736        server: Option<&ServerName>,
1737    ) -> HttpResult<get_public_rooms::v3::Response> {
1738        let limit = limit.map(UInt::from);
1739
1740        let request = assign!(get_public_rooms::v3::Request::new(), {
1741            limit,
1742            since: since.map(ToOwned::to_owned),
1743            server: server.map(ToOwned::to_owned),
1744        });
1745        self.send(request).await
1746    }
1747
1748    /// Create a room with the given parameters.
1749    ///
1750    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1751    /// created room.
1752    ///
1753    /// If you want to create a direct message with one specific user, you can
1754    /// use [`create_dm`][Self::create_dm], which is more convenient than
1755    /// assembling the [`create_room::v3::Request`] yourself.
1756    ///
1757    /// If the `is_direct` field of the request is set to `true` and at least
1758    /// one user is invited, the room will be automatically added to the direct
1759    /// rooms in the account data.
1760    ///
1761    /// # Examples
1762    ///
1763    /// ```no_run
1764    /// use matrix_sdk::{
1765    ///     Client,
1766    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1767    /// };
1768    /// # use url::Url;
1769    /// #
1770    /// # async {
1771    /// # let homeserver = Url::parse("http://example.com").unwrap();
1772    /// let request = CreateRoomRequest::new();
1773    /// let client = Client::new(homeserver).await.unwrap();
1774    /// assert!(client.create_room(request).await.is_ok());
1775    /// # };
1776    /// ```
1777    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1778        let invite = request.invite.clone();
1779        let is_direct_room = request.is_direct;
1780        let response = self.send(request).await?;
1781
1782        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1783
1784        let joined_room = Room::new(self.clone(), base_room);
1785
1786        if is_direct_room
1787            && !invite.is_empty()
1788            && let Err(error) =
1789                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1790        {
1791            // FIXME: Retry in the background
1792            error!("Failed to mark room as DM: {error}");
1793        }
1794
1795        Ok(joined_room)
1796    }
1797
1798    /// Create a DM room.
1799    ///
1800    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1801    /// given user being invited, the room marked `is_direct` and both the
1802    /// creator and invitee getting the default maximum power level.
1803    ///
1804    /// If the `e2e-encryption` feature is enabled, the room will also be
1805    /// encrypted.
1806    ///
1807    /// # Arguments
1808    ///
1809    /// * `user_id` - The ID of the user to create a DM for.
1810    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1811        #[cfg(feature = "e2e-encryption")]
1812        let initial_state = vec![
1813            InitialStateEvent::with_empty_state_key(
1814                RoomEncryptionEventContent::with_recommended_defaults(),
1815            )
1816            .to_raw_any(),
1817        ];
1818
1819        #[cfg(not(feature = "e2e-encryption"))]
1820        let initial_state = vec![];
1821
1822        let request = assign!(create_room::v3::Request::new(), {
1823            invite: vec![user_id.to_owned()],
1824            is_direct: true,
1825            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1826            initial_state,
1827        });
1828
1829        self.create_room(request).await
1830    }
1831
1832    /// Get the existing DM room with the given user, if any.
1833    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1834        let rooms = self.joined_rooms();
1835
1836        // Find the room we share with the `user_id` and only with `user_id`
1837        let room = rooms.into_iter().find(|r| {
1838            let targets = r.direct_targets();
1839            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1840        });
1841
1842        trace!(?user_id, ?room, "Found DM room with user");
1843        room
1844    }
1845
1846    /// Search the homeserver's directory for public rooms with a filter.
1847    ///
1848    /// # Arguments
1849    ///
1850    /// * `room_search` - The easiest way to create this request is using the
1851    ///   `get_public_rooms_filtered::Request` itself.
1852    ///
1853    /// # Examples
1854    ///
1855    /// ```no_run
1856    /// # use url::Url;
1857    /// # use matrix_sdk::Client;
1858    /// # async {
1859    /// # let homeserver = Url::parse("http://example.com")?;
1860    /// use matrix_sdk::ruma::{
1861    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1862    /// };
1863    /// # let mut client = Client::new(homeserver).await?;
1864    ///
1865    /// let mut filter = Filter::new();
1866    /// filter.generic_search_term = Some("rust".to_owned());
1867    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1868    /// request.filter = filter;
1869    ///
1870    /// let response = client.public_rooms_filtered(request).await?;
1871    ///
1872    /// for room in response.chunk {
1873    ///     println!("Found room {room:?}");
1874    /// }
1875    /// # anyhow::Ok(()) };
1876    /// ```
1877    pub async fn public_rooms_filtered(
1878        &self,
1879        request: get_public_rooms_filtered::v3::Request,
1880    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1881        self.send(request).await
1882    }
1883
1884    /// Send an arbitrary request to the server, without updating client state.
1885    ///
1886    /// **Warning:** Because this method *does not* update the client state, it
1887    /// is important to make sure that you account for this yourself, and
1888    /// use wrapper methods where available.  This method should *only* be
1889    /// used if a wrapper method for the endpoint you'd like to use is not
1890    /// available.
1891    ///
1892    /// # Arguments
1893    ///
1894    /// * `request` - A filled out and valid request for the endpoint to be hit
1895    ///
1896    /// * `timeout` - An optional request timeout setting, this overrides the
1897    ///   default request setting if one was set.
1898    ///
1899    /// # Examples
1900    ///
1901    /// ```no_run
1902    /// # use matrix_sdk::{Client, config::SyncSettings};
1903    /// # use url::Url;
1904    /// # async {
1905    /// # let homeserver = Url::parse("http://localhost:8080")?;
1906    /// # let mut client = Client::new(homeserver).await?;
1907    /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1908    ///
1909    /// // First construct the request you want to make
1910    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1911    /// // for all available Endpoints
1912    /// let user_id = owned_user_id!("@example:localhost");
1913    /// let request = profile::get_profile::v3::Request::new(user_id);
1914    ///
1915    /// // Start the request using Client::send()
1916    /// let response = client.send(request).await?;
1917    ///
1918    /// // Check the corresponding Response struct to find out what types are
1919    /// // returned
1920    /// # anyhow::Ok(()) };
1921    /// ```
1922    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1923    where
1924        Request: OutgoingRequest + Clone + Debug,
1925        Request::Authentication: SupportedAuthScheme,
1926        Request::PathBuilder: SupportedPathBuilder,
1927        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1928        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1929    {
1930        SendRequest {
1931            client: self.clone(),
1932            request,
1933            config: None,
1934            send_progress: Default::default(),
1935        }
1936    }
1937
1938    pub(crate) async fn send_inner<Request>(
1939        &self,
1940        request: Request,
1941        config: Option<RequestConfig>,
1942        send_progress: SharedObservable<TransmissionProgress>,
1943    ) -> HttpResult<Request::IncomingResponse>
1944    where
1945        Request: OutgoingRequest + Debug,
1946        Request::Authentication: SupportedAuthScheme,
1947        Request::PathBuilder: SupportedPathBuilder,
1948        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1949        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1950    {
1951        let homeserver = self.homeserver().to_string();
1952        let access_token = self.access_token();
1953        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1954
1955        let path_builder_input =
1956            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1957
1958        let result = self
1959            .inner
1960            .http_client
1961            .send(
1962                request,
1963                config,
1964                homeserver,
1965                access_token.as_deref(),
1966                path_builder_input,
1967                send_progress,
1968            )
1969            .await;
1970
1971        if let Err(Some(ErrorKind::UnknownToken { .. })) =
1972            result.as_ref().map_err(HttpError::client_api_error_kind)
1973            && let Some(access_token) = &access_token
1974        {
1975            // Mark the access token as expired.
1976            self.auth_ctx().set_access_token_expired(access_token);
1977        }
1978
1979        result
1980    }
1981
1982    fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
1983        _ = self
1984            .inner
1985            .auth_ctx
1986            .session_change_sender
1987            .send(SessionChange::UnknownToken(unknown_token_data.clone()));
1988    }
1989
1990    /// Fetches server versions from network; no caching.
1991    pub async fn fetch_server_versions(
1992        &self,
1993        request_config: Option<RequestConfig>,
1994    ) -> HttpResult<get_supported_versions::Response> {
1995        // Since this was called by the user, try to refresh the access token if
1996        // necessary.
1997        self.fetch_server_versions_inner(false, request_config).await
1998    }
1999
2000    /// Fetches server versions from network; no caching.
2001    ///
2002    /// If the access token is expired and `failsafe` is `false`, this will
2003    /// attempt to refresh the access token, otherwise this will try to make an
2004    /// unauthenticated request instead.
2005    pub(crate) async fn fetch_server_versions_inner(
2006        &self,
2007        failsafe: bool,
2008        request_config: Option<RequestConfig>,
2009    ) -> HttpResult<get_supported_versions::Response> {
2010        if !failsafe {
2011            // `Client::send()` handles refreshing access tokens.
2012            return self
2013                .send(get_supported_versions::Request::new())
2014                .with_request_config(request_config)
2015                .await;
2016        }
2017
2018        let homeserver = self.homeserver().to_string();
2019
2020        // If we have a fresh access token, try with it first.
2021        if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2022            && self.auth_ctx().has_valid_access_token()
2023            && let Some(access_token) = self.access_token()
2024        {
2025            let result = self
2026                .inner
2027                .http_client
2028                .send(
2029                    get_supported_versions::Request::new(),
2030                    request_config,
2031                    homeserver.clone(),
2032                    Some(&access_token),
2033                    (),
2034                    Default::default(),
2035                )
2036                .await;
2037
2038            if let Err(Some(ErrorKind::UnknownToken { .. })) =
2039                result.as_ref().map_err(HttpError::client_api_error_kind)
2040            {
2041                // If the access token is actually expired, mark it as expired and fallback to
2042                // the unauthenticated request below.
2043                self.auth_ctx().set_access_token_expired(&access_token);
2044            } else {
2045                // If the request succeeded or it's an other error, just stop now.
2046                return result;
2047            }
2048        }
2049
2050        // Try without authentication.
2051        self.inner
2052            .http_client
2053            .send(
2054                get_supported_versions::Request::new(),
2055                request_config,
2056                homeserver.clone(),
2057                None,
2058                (),
2059                Default::default(),
2060            )
2061            .await
2062    }
2063
2064    /// Fetches client well_known from network; no caching.
2065    ///
2066    /// 1. If the [`Client::server`] value is available, we use it to fetch the
2067    ///    well-known contents.
2068    /// 2. If it's not, we try extracting the server name from the
2069    ///    [`Client::user_id`] and building the server URL from it.
2070    /// 3. If we couldn't get the well-known contents with either the explicit
2071    ///    server name or the implicit extracted one, we try the homeserver URL
2072    ///    as a last resort.
2073    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2074        let homeserver = self.homeserver();
2075        let scheme = homeserver.scheme();
2076
2077        // Use the server name, either an explicit one or an implicit one taken from
2078        // the user id: sometimes we'll have only the homeserver url available and no
2079        // server name, but the server name can be extracted from the current user id.
2080        let server_url = self
2081            .server()
2082            .map(|server| server.to_string())
2083            // If the server name wasn't available, extract it from the user id and build a URL:
2084            // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2085            // will be the same for the public server url, lacking a better candidate.
2086            .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2087
2088        // If the server name is available, first try using it
2089        let response = if let Some(server_url) = server_url {
2090            // First try using the server name
2091            self.fetch_client_well_known_with_url(server_url).await
2092        } else {
2093            None
2094        };
2095
2096        // If we didn't get a well-known value yet, try with the homeserver url instead:
2097        if response.is_none() {
2098            // Sometimes people configure their well-known directly on the homeserver so use
2099            // this as a fallback when the server name is unknown.
2100            warn!(
2101                "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2102            );
2103            self.fetch_client_well_known_with_url(homeserver.to_string()).await
2104        } else {
2105            response
2106        }
2107    }
2108
2109    async fn fetch_client_well_known_with_url(
2110        &self,
2111        url: String,
2112    ) -> Option<discover_homeserver::Response> {
2113        let well_known = self
2114            .inner
2115            .http_client
2116            .send(
2117                discover_homeserver::Request::new(),
2118                Some(RequestConfig::short_retry()),
2119                url,
2120                None,
2121                (),
2122                Default::default(),
2123            )
2124            .await;
2125
2126        match well_known {
2127            Ok(well_known) => Some(well_known),
2128            Err(http_error) => {
2129                // It is perfectly valid to not have a well-known file.
2130                // Maybe we should check for a specific error code to be sure?
2131                warn!("Failed to fetch client well-known: {http_error}");
2132                None
2133            }
2134        }
2135    }
2136
2137    /// Load supported versions from storage, or fetch them from network and
2138    /// cache them.
2139    ///
2140    /// If `failsafe` is true, this will try to minimize side effects to avoid
2141    /// possible deadlocks.
2142    async fn load_or_fetch_supported_versions(
2143        &self,
2144        failsafe: bool,
2145    ) -> HttpResult<SupportedVersionsResponse> {
2146        match self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await {
2147            Ok(Some(stored)) => {
2148                if let Some(supported_versions) =
2149                    stored.into_supported_versions().and_then(|value| value.into_data())
2150                {
2151                    return Ok(supported_versions);
2152                }
2153            }
2154            Ok(None) => {
2155                // fallthrough: cache is empty
2156            }
2157            Err(err) => {
2158                warn!("error when loading cached supported versions: {err}");
2159                // fallthrough to network.
2160            }
2161        }
2162
2163        let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2164        let supported_versions = SupportedVersionsResponse {
2165            versions: server_versions.versions,
2166            unstable_features: server_versions.unstable_features,
2167        };
2168
2169        // Only attempt to cache the result in storage if the request was authenticated.
2170        if self.auth_ctx().has_valid_access_token()
2171            && let Err(err) = self
2172                .state_store()
2173                .set_kv_data(
2174                    StateStoreDataKey::SupportedVersions,
2175                    StateStoreDataValue::SupportedVersions(TtlStoreValue::new(
2176                        supported_versions.clone(),
2177                    )),
2178                )
2179                .await
2180        {
2181            warn!("error when caching supported versions: {err}");
2182        }
2183
2184        Ok(supported_versions)
2185    }
2186
2187    pub(crate) async fn get_cached_supported_versions(&self) -> Option<SupportedVersions> {
2188        let supported_versions = &self.inner.caches.supported_versions;
2189
2190        if let CachedValue::Cached(val) = &*supported_versions.read().await {
2191            Some(val.clone())
2192        } else {
2193            None
2194        }
2195    }
2196
2197    /// Get the Matrix versions and features supported by the homeserver by
2198    /// fetching them from the server or the cache.
2199    ///
2200    /// This is equivalent to calling both [`Client::server_versions()`] and
2201    /// [`Client::unstable_features()`]. To always fetch the result from the
2202    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2203    /// and then `.as_supported_versions()` on the response.
2204    ///
2205    /// # Examples
2206    ///
2207    /// ```no_run
2208    /// use ruma::api::{FeatureFlag, MatrixVersion};
2209    /// # use matrix_sdk::{Client, config::SyncSettings};
2210    /// # use url::Url;
2211    /// # async {
2212    /// # let homeserver = Url::parse("http://localhost:8080")?;
2213    /// # let mut client = Client::new(homeserver).await?;
2214    ///
2215    /// let supported = client.supported_versions().await?;
2216    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2217    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2218    ///
2219    /// let msc_x_feature = FeatureFlag::from("msc_x");
2220    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2221    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2222    /// # anyhow::Ok(()) };
2223    /// ```
2224    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2225        self.supported_versions_inner(false).await
2226    }
2227
2228    /// Get the Matrix versions and features supported by the homeserver by
2229    /// fetching them from the server or the cache.
2230    ///
2231    /// If `failsafe` is true, this will try to minimize side effects to avoid
2232    /// possible deadlocks.
2233    pub(crate) async fn supported_versions_inner(
2234        &self,
2235        failsafe: bool,
2236    ) -> HttpResult<SupportedVersions> {
2237        let cached_supported_versions = &self.inner.caches.supported_versions;
2238        if let CachedValue::Cached(val) = &*cached_supported_versions.read().await {
2239            return Ok(val.clone());
2240        }
2241
2242        let mut guarded_supported_versions = cached_supported_versions.write().await;
2243        if let CachedValue::Cached(val) = &*guarded_supported_versions {
2244            return Ok(val.clone());
2245        }
2246
2247        let supported = self.load_or_fetch_supported_versions(failsafe).await?;
2248
2249        // Fill both unstable features and server versions at once.
2250        let mut supported_versions = supported.supported_versions();
2251        if supported_versions.versions.is_empty() {
2252            supported_versions.versions = [MatrixVersion::V1_0].into();
2253        }
2254
2255        // Only cache the result if the request was authenticated.
2256        if self.auth_ctx().has_valid_access_token() {
2257            *guarded_supported_versions = CachedValue::Cached(supported_versions.clone());
2258        }
2259
2260        Ok(supported_versions)
2261    }
2262
2263    /// Get the Matrix versions and features supported by the homeserver by
2264    /// fetching them from the cache.
2265    ///
2266    /// For a version of this function that fetches the supported versions and
2267    /// features from the homeserver if the [`SupportedVersions`] aren't
2268    /// found in the cache, take a look at the [`Client::server_versions()`]
2269    /// method.
2270    ///
2271    /// # Examples
2272    ///
2273    /// ```no_run
2274    /// use ruma::api::{FeatureFlag, MatrixVersion};
2275    /// # use matrix_sdk::{Client, config::SyncSettings};
2276    /// # use url::Url;
2277    /// # async {
2278    /// # let homeserver = Url::parse("http://localhost:8080")?;
2279    /// # let mut client = Client::new(homeserver).await?;
2280    ///
2281    /// let supported =
2282    ///     if let Some(supported) = client.supported_versions_cached().await? {
2283    ///         supported
2284    ///     } else {
2285    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2286    ///     };
2287    ///
2288    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2289    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2290    ///
2291    /// let msc_x_feature = FeatureFlag::from("msc_x");
2292    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2293    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2294    /// # anyhow::Ok(()) };
2295    /// ```
2296    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2297        if let Some(cached) = self.get_cached_supported_versions().await {
2298            Ok(Some(cached))
2299        } else if let Some(stored) =
2300            self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await?
2301        {
2302            if let Some(supported) =
2303                stored.into_supported_versions().and_then(|value| value.into_data())
2304            {
2305                Ok(Some(supported.supported_versions()))
2306            } else {
2307                Ok(None)
2308            }
2309        } else {
2310            Ok(None)
2311        }
2312    }
2313
2314    /// Get the Matrix versions supported by the homeserver by fetching them
2315    /// from the server or the cache.
2316    ///
2317    /// # Examples
2318    ///
2319    /// ```no_run
2320    /// use ruma::api::MatrixVersion;
2321    /// # use matrix_sdk::{Client, config::SyncSettings};
2322    /// # use url::Url;
2323    /// # async {
2324    /// # let homeserver = Url::parse("http://localhost:8080")?;
2325    /// # let mut client = Client::new(homeserver).await?;
2326    ///
2327    /// let server_versions = client.server_versions().await?;
2328    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2329    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2330    /// # anyhow::Ok(()) };
2331    /// ```
2332    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2333        Ok(self.supported_versions().await?.versions)
2334    }
2335
2336    /// Get the unstable features supported by the homeserver by fetching them
2337    /// from the server or the cache.
2338    ///
2339    /// # Examples
2340    ///
2341    /// ```no_run
2342    /// use matrix_sdk::ruma::api::FeatureFlag;
2343    /// # use matrix_sdk::{Client, config::SyncSettings};
2344    /// # use url::Url;
2345    /// # async {
2346    /// # let homeserver = Url::parse("http://localhost:8080")?;
2347    /// # let mut client = Client::new(homeserver).await?;
2348    ///
2349    /// let msc_x_feature = FeatureFlag::from("msc_x");
2350    /// let unstable_features = client.unstable_features().await?;
2351    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2352    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2353    /// # anyhow::Ok(()) };
2354    /// ```
2355    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2356        Ok(self.supported_versions().await?.features)
2357    }
2358
2359    /// Empty the supported versions and unstable features cache.
2360    ///
2361    /// Since the SDK caches the supported versions, it's possible to have a
2362    /// stale entry in the cache. This functions makes it possible to force
2363    /// reset it.
2364    pub async fn reset_supported_versions(&self) -> Result<()> {
2365        // Empty the in-memory caches.
2366        self.inner.caches.supported_versions.write().await.take();
2367
2368        // Empty the store cache.
2369        Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2370    }
2371
2372    /// Load well-known from storage, or fetch it from network and cache it.
2373    async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2374        match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
2375            Ok(Some(stored)) => {
2376                if let Some(well_known) =
2377                    stored.into_well_known().and_then(|value| value.into_data())
2378                {
2379                    return Ok(well_known);
2380                }
2381            }
2382            Ok(None) => {
2383                // fallthrough: cache is empty
2384            }
2385            Err(err) => {
2386                warn!("error when loading cached well-known: {err}");
2387                // fallthrough to network.
2388            }
2389        }
2390
2391        let well_known = self.fetch_client_well_known().await.map(Into::into);
2392
2393        // Attempt to cache the result in storage.
2394        if let Err(err) = self
2395            .state_store()
2396            .set_kv_data(
2397                StateStoreDataKey::WellKnown,
2398                StateStoreDataValue::WellKnown(TtlStoreValue::new(well_known.clone())),
2399            )
2400            .await
2401        {
2402            warn!("error when caching well-known: {err}");
2403        }
2404
2405        Ok(well_known)
2406    }
2407
2408    async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2409        let well_known = &self.inner.caches.well_known;
2410        if let CachedValue::Cached(val) = &*well_known.read().await {
2411            return Ok(val.clone());
2412        }
2413
2414        let mut guarded_well_known = well_known.write().await;
2415        if let CachedValue::Cached(val) = &*guarded_well_known {
2416            return Ok(val.clone());
2417        }
2418
2419        let well_known = self.load_or_fetch_well_known().await?;
2420
2421        *guarded_well_known = CachedValue::Cached(well_known.clone());
2422
2423        Ok(well_known)
2424    }
2425
2426    /// Get information about the homeserver's advertised RTC foci by fetching
2427    /// the well-known file from the server or the cache.
2428    ///
2429    /// # Examples
2430    /// ```no_run
2431    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2432    /// # use url::Url;
2433    /// # async {
2434    /// # let homeserver = Url::parse("http://localhost:8080")?;
2435    /// # let mut client = Client::new(homeserver).await?;
2436    /// let rtc_foci = client.rtc_foci().await?;
2437    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2438    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2439    ///     _ => None,
2440    /// });
2441    /// if let Some(info) = default_livekit_focus_info {
2442    ///     println!("Default LiveKit service URL: {}", info.service_url);
2443    /// }
2444    /// # anyhow::Ok(()) };
2445    /// ```
2446    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2447        let well_known = self.get_or_load_and_cache_well_known().await?;
2448
2449        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2450    }
2451
2452    /// Empty the well-known cache.
2453    ///
2454    /// Since the SDK caches the well-known, it's possible to have a stale entry
2455    /// in the cache. This functions makes it possible to force reset it.
2456    pub async fn reset_well_known(&self) -> Result<()> {
2457        // Empty the in-memory caches.
2458        self.inner.caches.well_known.write().await.take();
2459
2460        // Empty the store cache.
2461        Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2462    }
2463
2464    /// Check whether MSC 4028 is enabled on the homeserver.
2465    ///
2466    /// # Examples
2467    ///
2468    /// ```no_run
2469    /// # use matrix_sdk::{Client, config::SyncSettings};
2470    /// # use url::Url;
2471    /// # async {
2472    /// # let homeserver = Url::parse("http://localhost:8080")?;
2473    /// # let mut client = Client::new(homeserver).await?;
2474    /// let msc4028_enabled =
2475    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2476    /// # anyhow::Ok(()) };
2477    /// ```
2478    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2479        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2480    }
2481
2482    /// Get information of all our own devices.
2483    ///
2484    /// # Examples
2485    ///
2486    /// ```no_run
2487    /// # use matrix_sdk::{Client, config::SyncSettings};
2488    /// # use url::Url;
2489    /// # async {
2490    /// # let homeserver = Url::parse("http://localhost:8080")?;
2491    /// # let mut client = Client::new(homeserver).await?;
2492    /// let response = client.devices().await?;
2493    ///
2494    /// for device in response.devices {
2495    ///     println!(
2496    ///         "Device: {} {}",
2497    ///         device.device_id,
2498    ///         device.display_name.as_deref().unwrap_or("")
2499    ///     );
2500    /// }
2501    /// # anyhow::Ok(()) };
2502    /// ```
2503    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2504        let request = get_devices::v3::Request::new();
2505
2506        self.send(request).await
2507    }
2508
2509    /// Delete the given devices from the server.
2510    ///
2511    /// # Arguments
2512    ///
2513    /// * `devices` - The list of devices that should be deleted from the
2514    ///   server.
2515    ///
2516    /// * `auth_data` - This request requires user interactive auth, the first
2517    ///   request needs to set this to `None` and will always fail with an
2518    ///   `UiaaResponse`. The response will contain information for the
2519    ///   interactive auth and the same request needs to be made but this time
2520    ///   with some `auth_data` provided.
2521    ///
2522    /// ```no_run
2523    /// # use matrix_sdk::{
2524    /// #    ruma::{api::client::uiaa, owned_device_id},
2525    /// #    Client, Error, config::SyncSettings,
2526    /// # };
2527    /// # use serde_json::json;
2528    /// # use url::Url;
2529    /// # use std::collections::BTreeMap;
2530    /// # async {
2531    /// # let homeserver = Url::parse("http://localhost:8080")?;
2532    /// # let mut client = Client::new(homeserver).await?;
2533    /// let devices = &[owned_device_id!("DEVICEID")];
2534    ///
2535    /// if let Err(e) = client.delete_devices(devices, None).await {
2536    ///     if let Some(info) = e.as_uiaa_response() {
2537    ///         let mut password = uiaa::Password::new(
2538    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2539    ///             "wordpass".to_owned(),
2540    ///         );
2541    ///         password.session = info.session.clone();
2542    ///
2543    ///         client
2544    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2545    ///             .await?;
2546    ///     }
2547    /// }
2548    /// # anyhow::Ok(()) };
2549    pub async fn delete_devices(
2550        &self,
2551        devices: &[OwnedDeviceId],
2552        auth_data: Option<uiaa::AuthData>,
2553    ) -> HttpResult<delete_devices::v3::Response> {
2554        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2555        request.auth = auth_data;
2556
2557        self.send(request).await
2558    }
2559
2560    /// Change the display name of a device owned by the current user.
2561    ///
2562    /// Returns a `update_device::Response` which specifies the result
2563    /// of the operation.
2564    ///
2565    /// # Arguments
2566    ///
2567    /// * `device_id` - The ID of the device to change the display name of.
2568    /// * `display_name` - The new display name to set.
2569    pub async fn rename_device(
2570        &self,
2571        device_id: &DeviceId,
2572        display_name: &str,
2573    ) -> HttpResult<update_device::v3::Response> {
2574        let mut request = update_device::v3::Request::new(device_id.to_owned());
2575        request.display_name = Some(display_name.to_owned());
2576
2577        self.send(request).await
2578    }
2579
2580    /// Check whether a device with a specific ID exists on the server.
2581    ///
2582    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2583    /// with 404 and the underlying error otherwise.
2584    ///
2585    /// # Arguments
2586    ///
2587    /// * `device_id` - The ID of the device to query.
2588    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2589        let request = device::get_device::v3::Request::new(device_id);
2590        match self.send(request).await {
2591            Ok(_) => Ok(true),
2592            Err(err) => {
2593                if let Some(error) = err.as_client_api_error()
2594                    && error.status_code == 404
2595                {
2596                    Ok(false)
2597                } else {
2598                    Err(err.into())
2599                }
2600            }
2601        }
2602    }
2603
2604    /// Synchronize the client's state with the latest state on the server.
2605    ///
2606    /// ## Syncing Events
2607    ///
2608    /// Messages or any other type of event need to be periodically fetched from
2609    /// the server, this is achieved by sending a `/sync` request to the server.
2610    ///
2611    /// The first sync is sent out without a [`token`]. The response of the
2612    /// first sync will contain a [`next_batch`] field which should then be
2613    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2614    /// don't receive the same events multiple times.
2615    ///
2616    /// ## Long Polling
2617    ///
2618    /// A sync should in the usual case always be in flight. The
2619    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2620    /// long the server will wait for new events before it will respond.
2621    /// The server will respond immediately if some new events arrive before the
2622    /// timeout has expired. If no changes arrive and the timeout expires an
2623    /// empty sync response will be sent to the client.
2624    ///
2625    /// This method of sending a request that may not receive a response
2626    /// immediately is called long polling.
2627    ///
2628    /// ## Filtering Events
2629    ///
2630    /// The number or type of messages and events that the client should receive
2631    /// from the server can be altered using a [`Filter`].
2632    ///
2633    /// Filters can be non-trivial and, since they will be sent with every sync
2634    /// request, they may take up a bunch of unnecessary bandwidth.
2635    ///
2636    /// Luckily filters can be uploaded to the server and reused using an unique
2637    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2638    /// method.
2639    ///
2640    /// # Arguments
2641    ///
2642    /// * `sync_settings` - Settings for the sync call, this allows us to set
2643    /// various options to configure the sync:
2644    ///     * [`filter`] - To configure which events we receive and which get
2645    ///       [filtered] by the server
2646    ///     * [`timeout`] - To configure our [long polling] setup.
2647    ///     * [`token`] - To tell the server which events we already received
2648    ///       and where we wish to continue syncing.
2649    ///     * [`full_state`] - To tell the server that we wish to receive all
2650    ///       state events, regardless of our configured [`token`].
2651    ///     * [`set_presence`] - To tell the server to set the presence and to
2652    ///       which state.
2653    ///
2654    /// # Examples
2655    ///
2656    /// ```no_run
2657    /// # use url::Url;
2658    /// # async {
2659    /// # let homeserver = Url::parse("http://localhost:8080")?;
2660    /// # let username = "";
2661    /// # let password = "";
2662    /// use matrix_sdk::{
2663    ///     Client, config::SyncSettings,
2664    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2665    /// };
2666    ///
2667    /// let client = Client::new(homeserver).await?;
2668    /// client.matrix_auth().login_username(username, password).send().await?;
2669    ///
2670    /// // Sync once so we receive the client state and old messages.
2671    /// client.sync_once(SyncSettings::default()).await?;
2672    ///
2673    /// // Register our handler so we start responding once we receive a new
2674    /// // event.
2675    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2676    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2677    /// });
2678    ///
2679    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2680    /// // from our `sync_once()` call automatically.
2681    /// client.sync(SyncSettings::default()).await;
2682    /// # anyhow::Ok(()) };
2683    /// ```
2684    ///
2685    /// [`sync`]: #method.sync
2686    /// [`SyncSettings`]: crate::config::SyncSettings
2687    /// [`token`]: crate::config::SyncSettings#method.token
2688    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2689    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2690    /// [`set_presence`]: ruma::presence::PresenceState
2691    /// [`filter`]: crate::config::SyncSettings#method.filter
2692    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2693    /// [`next_batch`]: SyncResponse#structfield.next_batch
2694    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2695    /// [long polling]: #long-polling
2696    /// [filtered]: #filtering-events
2697    #[instrument(skip(self))]
2698    pub async fn sync_once(
2699        &self,
2700        sync_settings: crate::config::SyncSettings,
2701    ) -> Result<SyncResponse> {
2702        // The sync might not return for quite a while due to the timeout.
2703        // We'll see if there's anything crypto related to send out before we
2704        // sync, i.e. if we closed our client after a sync but before the
2705        // crypto requests were sent out.
2706        //
2707        // This will mostly be a no-op.
2708        #[cfg(feature = "e2e-encryption")]
2709        if let Err(e) = self.send_outgoing_requests().await {
2710            error!(error = ?e, "Error while sending outgoing E2EE requests");
2711        }
2712
2713        let token = match sync_settings.token {
2714            SyncToken::Specific(token) => Some(token),
2715            SyncToken::NoToken => None,
2716            SyncToken::ReusePrevious => self.sync_token().await,
2717        };
2718
2719        let request = assign!(sync_events::v3::Request::new(), {
2720            filter: sync_settings.filter.map(|f| *f),
2721            since: token,
2722            full_state: sync_settings.full_state,
2723            set_presence: sync_settings.set_presence,
2724            timeout: sync_settings.timeout,
2725            use_state_after: true,
2726        });
2727        let mut request_config = self.request_config();
2728        if let Some(timeout) = sync_settings.timeout {
2729            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2730            request_config.timeout = Some(base_timeout + timeout);
2731        }
2732
2733        let response = self.send(request).with_request_config(request_config).await?;
2734        let next_batch = response.next_batch.clone();
2735        let response = self.process_sync(response).await?;
2736
2737        #[cfg(feature = "e2e-encryption")]
2738        if let Err(e) = self.send_outgoing_requests().await {
2739            error!(error = ?e, "Error while sending outgoing E2EE requests");
2740        }
2741
2742        self.inner.sync_beat.notify(usize::MAX);
2743
2744        Ok(SyncResponse::new(next_batch, response))
2745    }
2746
2747    /// Repeatedly synchronize the client state with the server.
2748    ///
2749    /// This method will only return on error, if cancellation is needed
2750    /// the method should be wrapped in a cancelable task or the
2751    /// [`Client::sync_with_callback`] method can be used or
2752    /// [`Client::sync_with_result_callback`] if you want to handle error
2753    /// cases in the loop, too.
2754    ///
2755    /// This method will internally call [`Client::sync_once`] in a loop.
2756    ///
2757    /// This method can be used with the [`Client::add_event_handler`]
2758    /// method to react to individual events. If you instead wish to handle
2759    /// events in a bulk manner the [`Client::sync_with_callback`],
2760    /// [`Client::sync_with_result_callback`] and
2761    /// [`Client::sync_stream`] methods can be used instead. Those methods
2762    /// repeatedly return the whole sync response.
2763    ///
2764    /// # Arguments
2765    ///
2766    /// * `sync_settings` - Settings for the sync call. *Note* that those
2767    ///   settings will be only used for the first sync call. See the argument
2768    ///   docs for [`Client::sync_once`] for more info.
2769    ///
2770    /// # Return
2771    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2772    /// up to the user of the API to check the error and decide whether the sync
2773    /// should continue or not.
2774    ///
2775    /// # Examples
2776    ///
2777    /// ```no_run
2778    /// # use url::Url;
2779    /// # async {
2780    /// # let homeserver = Url::parse("http://localhost:8080")?;
2781    /// # let username = "";
2782    /// # let password = "";
2783    /// use matrix_sdk::{
2784    ///     Client, config::SyncSettings,
2785    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2786    /// };
2787    ///
2788    /// let client = Client::new(homeserver).await?;
2789    /// client.matrix_auth().login_username(&username, &password).send().await?;
2790    ///
2791    /// // Register our handler so we start responding once we receive a new
2792    /// // event.
2793    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2794    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2795    /// });
2796    ///
2797    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2798    /// // automatically.
2799    /// client.sync(SyncSettings::default()).await?;
2800    /// # anyhow::Ok(()) };
2801    /// ```
2802    ///
2803    /// [argument docs]: #method.sync_once
2804    /// [`sync_with_callback`]: #method.sync_with_callback
2805    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2806        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2807    }
2808
2809    /// Repeatedly call sync to synchronize the client state with the server.
2810    ///
2811    /// # Arguments
2812    ///
2813    /// * `sync_settings` - Settings for the sync call. *Note* that those
2814    ///   settings will be only used for the first sync call. See the argument
2815    ///   docs for [`Client::sync_once`] for more info.
2816    ///
2817    /// * `callback` - A callback that will be called every time a successful
2818    ///   response has been fetched from the server. The callback must return a
2819    ///   boolean which signalizes if the method should stop syncing. If the
2820    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2821    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2822    ///
2823    /// # Return
2824    /// The sync runs until an error occurs or the
2825    /// callback indicates that the Loop should stop. If the callback asked for
2826    /// a regular stop, the result will be `Ok(())` otherwise the
2827    /// `Err(Error)` is returned.
2828    ///
2829    /// # Examples
2830    ///
2831    /// The following example demonstrates how to sync forever while sending all
2832    /// the interesting events through a mpsc channel to another thread e.g. a
2833    /// UI thread.
2834    ///
2835    /// ```no_run
2836    /// # use std::time::Duration;
2837    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2838    /// # use url::Url;
2839    /// # async {
2840    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2841    /// # let mut client = Client::new(homeserver).await.unwrap();
2842    ///
2843    /// use tokio::sync::mpsc::channel;
2844    ///
2845    /// let (tx, rx) = channel(100);
2846    ///
2847    /// let sync_channel = &tx;
2848    /// let sync_settings = SyncSettings::new()
2849    ///     .timeout(Duration::from_secs(30));
2850    ///
2851    /// client
2852    ///     .sync_with_callback(sync_settings, |response| async move {
2853    ///         let channel = sync_channel;
2854    ///         for (room_id, room) in response.rooms.joined {
2855    ///             for event in room.timeline.events {
2856    ///                 channel.send(event).await.unwrap();
2857    ///             }
2858    ///         }
2859    ///
2860    ///         LoopCtrl::Continue
2861    ///     })
2862    ///     .await;
2863    /// };
2864    /// ```
2865    #[instrument(skip_all)]
2866    pub async fn sync_with_callback<C>(
2867        &self,
2868        sync_settings: crate::config::SyncSettings,
2869        callback: impl Fn(SyncResponse) -> C,
2870    ) -> Result<(), Error>
2871    where
2872        C: Future<Output = LoopCtrl>,
2873    {
2874        self.sync_with_result_callback(sync_settings, |result| async {
2875            Ok(callback(result?).await)
2876        })
2877        .await
2878    }
2879
2880    /// Repeatedly call sync to synchronize the client state with the server.
2881    ///
2882    /// # Arguments
2883    ///
2884    /// * `sync_settings` - Settings for the sync call. *Note* that those
2885    ///   settings will be only used for the first sync call. See the argument
2886    ///   docs for [`Client::sync_once`] for more info.
2887    ///
2888    /// * `callback` - A callback that will be called every time after a
2889    ///   response has been received, failure or not. The callback returns a
2890    ///   `Result<LoopCtrl, Error>`, too. When returning
2891    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2892    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2893    ///   function returns `Ok(())`. In case the callback can't handle the
2894    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2895    ///   which results in the sync ending and the `Err(Error)` being returned.
2896    ///
2897    /// # Return
2898    /// The sync runs until an error occurs that the callback can't handle or
2899    /// the callback indicates that the Loop should stop. If the callback
2900    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2901    /// `Err(Error)` is returned.
2902    ///
2903    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2904    /// this, and are handled first without sending the result to the
2905    /// callback. Only after they have exceeded is the `Result` handed to
2906    /// the callback.
2907    ///
2908    /// # Examples
2909    ///
2910    /// The following example demonstrates how to sync forever while sending all
2911    /// the interesting events through a mpsc channel to another thread e.g. a
2912    /// UI thread.
2913    ///
2914    /// ```no_run
2915    /// # use std::time::Duration;
2916    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2917    /// # use url::Url;
2918    /// # async {
2919    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2920    /// # let mut client = Client::new(homeserver).await.unwrap();
2921    /// #
2922    /// use tokio::sync::mpsc::channel;
2923    ///
2924    /// let (tx, rx) = channel(100);
2925    ///
2926    /// let sync_channel = &tx;
2927    /// let sync_settings = SyncSettings::new()
2928    ///     .timeout(Duration::from_secs(30));
2929    ///
2930    /// client
2931    ///     .sync_with_result_callback(sync_settings, |response| async move {
2932    ///         let channel = sync_channel;
2933    ///         let sync_response = response?;
2934    ///         for (room_id, room) in sync_response.rooms.joined {
2935    ///              for event in room.timeline.events {
2936    ///                  channel.send(event).await.unwrap();
2937    ///               }
2938    ///         }
2939    ///
2940    ///         Ok(LoopCtrl::Continue)
2941    ///     })
2942    ///     .await;
2943    /// };
2944    /// ```
2945    #[instrument(skip(self, callback))]
2946    pub async fn sync_with_result_callback<C>(
2947        &self,
2948        sync_settings: crate::config::SyncSettings,
2949        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2950    ) -> Result<(), Error>
2951    where
2952        C: Future<Output = Result<LoopCtrl, Error>>,
2953    {
2954        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2955
2956        while let Some(result) = sync_stream.next().await {
2957            trace!("Running callback");
2958            if callback(result).await? == LoopCtrl::Break {
2959                trace!("Callback told us to stop");
2960                break;
2961            }
2962            trace!("Done running callback");
2963        }
2964
2965        Ok(())
2966    }
2967
2968    //// Repeatedly synchronize the client state with the server.
2969    ///
2970    /// This method will internally call [`Client::sync_once`] in a loop and is
2971    /// equivalent to the [`Client::sync`] method but the responses are provided
2972    /// as an async stream.
2973    ///
2974    /// # Arguments
2975    ///
2976    /// * `sync_settings` - Settings for the sync call. *Note* that those
2977    ///   settings will be only used for the first sync call. See the argument
2978    ///   docs for [`Client::sync_once`] for more info.
2979    ///
2980    /// # Examples
2981    ///
2982    /// ```no_run
2983    /// # use url::Url;
2984    /// # async {
2985    /// # let homeserver = Url::parse("http://localhost:8080")?;
2986    /// # let username = "";
2987    /// # let password = "";
2988    /// use futures_util::StreamExt;
2989    /// use matrix_sdk::{Client, config::SyncSettings};
2990    ///
2991    /// let client = Client::new(homeserver).await?;
2992    /// client.matrix_auth().login_username(&username, &password).send().await?;
2993    ///
2994    /// let mut sync_stream =
2995    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2996    ///
2997    /// while let Some(Ok(response)) = sync_stream.next().await {
2998    ///     for room in response.rooms.joined.values() {
2999    ///         for e in &room.timeline.events {
3000    ///             if let Ok(event) = e.raw().deserialize() {
3001    ///                 println!("Received event {:?}", event);
3002    ///             }
3003    ///         }
3004    ///     }
3005    /// }
3006    ///
3007    /// # anyhow::Ok(()) };
3008    /// ```
3009    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3010    #[instrument(skip(self))]
3011    pub async fn sync_stream(
3012        &self,
3013        mut sync_settings: crate::config::SyncSettings,
3014    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3015        let mut is_first_sync = true;
3016        let mut timeout = None;
3017        let mut last_sync_time: Option<Instant> = None;
3018
3019        let parent_span = Span::current();
3020
3021        async_stream::stream!({
3022            loop {
3023                trace!("Syncing");
3024
3025                if sync_settings.ignore_timeout_on_first_sync {
3026                    if is_first_sync {
3027                        timeout = sync_settings.timeout.take();
3028                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
3029                        sync_settings.timeout = timeout.take();
3030                    }
3031
3032                    is_first_sync = false;
3033                }
3034
3035                yield self
3036                    .sync_loop_helper(&mut sync_settings)
3037                    .instrument(parent_span.clone())
3038                    .await;
3039
3040                Client::delay_sync(&mut last_sync_time).await
3041            }
3042        })
3043    }
3044
3045    /// Get the current, if any, sync token of the client.
3046    /// This will be None if the client didn't sync at least once.
3047    pub(crate) async fn sync_token(&self) -> Option<String> {
3048        self.inner.base_client.sync_token().await
3049    }
3050
3051    /// Gets information about the owner of a given access token.
3052    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3053        let request = whoami::v3::Request::new();
3054        self.send(request).await
3055    }
3056
3057    /// Subscribes a new receiver to client SessionChange broadcasts.
3058    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3059        let broadcast = &self.auth_ctx().session_change_sender;
3060        broadcast.subscribe()
3061    }
3062
3063    /// Sets the save/restore session callbacks.
3064    ///
3065    /// This is another mechanism to get synchronous updates to session tokens,
3066    /// while [`Self::subscribe_to_session_changes`] provides an async update.
3067    pub fn set_session_callbacks(
3068        &self,
3069        reload_session_callback: Box<ReloadSessionCallback>,
3070        save_session_callback: Box<SaveSessionCallback>,
3071    ) -> Result<()> {
3072        self.inner
3073            .auth_ctx
3074            .reload_session_callback
3075            .set(reload_session_callback)
3076            .map_err(|_| Error::MultipleSessionCallbacks)?;
3077
3078        self.inner
3079            .auth_ctx
3080            .save_session_callback
3081            .set(save_session_callback)
3082            .map_err(|_| Error::MultipleSessionCallbacks)?;
3083
3084        Ok(())
3085    }
3086
3087    /// Get the notification settings of the current owner of the client.
3088    pub async fn notification_settings(&self) -> NotificationSettings {
3089        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3090        NotificationSettings::new(self.clone(), ruleset)
3091    }
3092
3093    /// Create a new specialized `Client` that can process notifications.
3094    ///
3095    /// See [`CrossProcessLock::new`] to learn more about
3096    /// `cross_process_lock_config`.
3097    ///
3098    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3099    pub async fn notification_client(
3100        &self,
3101        cross_process_lock_config: CrossProcessLockConfig,
3102    ) -> Result<Client> {
3103        let client = Client {
3104            inner: ClientInner::new(
3105                self.inner.auth_ctx.clone(),
3106                self.server().cloned(),
3107                self.homeserver(),
3108                self.sliding_sync_version(),
3109                self.inner.http_client.clone(),
3110                self.inner
3111                    .base_client
3112                    .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3113                    .await?,
3114                self.inner.caches.supported_versions.read().await.clone(),
3115                self.inner.caches.well_known.read().await.clone(),
3116                self.inner.respect_login_well_known,
3117                self.inner.event_cache.clone(),
3118                self.inner.send_queue_data.clone(),
3119                self.inner.latest_events.clone(),
3120                #[cfg(feature = "e2e-encryption")]
3121                self.inner.e2ee.encryption_settings,
3122                #[cfg(feature = "e2e-encryption")]
3123                self.inner.enable_share_history_on_invite,
3124                cross_process_lock_config,
3125                #[cfg(feature = "experimental-search")]
3126                self.inner.search_index.clone(),
3127                self.inner.thread_subscription_catchup.clone(),
3128            )
3129            .await,
3130        };
3131
3132        Ok(client)
3133    }
3134
3135    /// The [`EventCache`] instance for this [`Client`].
3136    pub fn event_cache(&self) -> &EventCache {
3137        // SAFETY: always initialized in the `Client` ctor.
3138        self.inner.event_cache.get().unwrap()
3139    }
3140
3141    /// The [`LatestEvents`] instance for this [`Client`].
3142    pub async fn latest_events(&self) -> &LatestEvents {
3143        self.inner
3144            .latest_events
3145            .get_or_init(|| async {
3146                LatestEvents::new(
3147                    WeakClient::from_client(self),
3148                    self.event_cache().clone(),
3149                    SendQueue::new(self.clone()),
3150                    self.room_info_notable_update_receiver(),
3151                )
3152            })
3153            .await
3154    }
3155
3156    /// Waits until an at least partially synced room is received, and returns
3157    /// it.
3158    ///
3159    /// **Note: this function will loop endlessly until either it finds the room
3160    /// or an externally set timeout happens.**
3161    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3162        loop {
3163            if let Some(room) = self.get_room(room_id) {
3164                if room.is_state_partially_or_fully_synced() {
3165                    debug!("Found just created room!");
3166                    return room;
3167                }
3168                debug!("Room wasn't partially synced, waiting for sync beat to try again");
3169            } else {
3170                debug!("Room wasn't found, waiting for sync beat to try again");
3171            }
3172            self.inner.sync_beat.listen().await;
3173        }
3174    }
3175
3176    /// Knock on a room given its `room_id_or_alias` to ask for permission to
3177    /// join it.
3178    pub async fn knock(
3179        &self,
3180        room_id_or_alias: OwnedRoomOrAliasId,
3181        reason: Option<String>,
3182        server_names: Vec<OwnedServerName>,
3183    ) -> Result<Room> {
3184        let request =
3185            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3186        let response = self.send(request).await?;
3187        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3188        Ok(Room::new(self.clone(), base_room))
3189    }
3190
3191    /// Checks whether the provided `user_id` belongs to an ignored user.
3192    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3193        self.base_client().is_user_ignored(user_id).await
3194    }
3195
3196    /// Gets the `max_upload_size` value from the homeserver, getting either a
3197    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3198    /// missing.
3199    ///
3200    /// Check the spec for more info:
3201    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3202    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3203        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3204        if let Some(data) = max_upload_size_lock.get() {
3205            return Ok(data.to_owned());
3206        }
3207
3208        // Use the authenticated endpoint when the server supports it.
3209        let supported_versions = self.supported_versions().await?;
3210        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3211            .is_supported(&supported_versions);
3212
3213        let upload_size = if use_auth {
3214            self.send(authenticated_media::get_media_config::v1::Request::default())
3215                .await?
3216                .upload_size
3217        } else {
3218            #[allow(deprecated)]
3219            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3220        };
3221
3222        match max_upload_size_lock.set(upload_size) {
3223            Ok(_) => Ok(upload_size),
3224            Err(error) => {
3225                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3226            }
3227        }
3228    }
3229
3230    /// The settings to use for decrypting events.
3231    #[cfg(feature = "e2e-encryption")]
3232    pub fn decryption_settings(&self) -> &DecryptionSettings {
3233        &self.base_client().decryption_settings
3234    }
3235
3236    /// Returns the [`SearchIndex`] for this [`Client`].
3237    #[cfg(feature = "experimental-search")]
3238    pub fn search_index(&self) -> &SearchIndex {
3239        &self.inner.search_index
3240    }
3241
3242    /// Whether the client is configured to take thread subscriptions (MSC4306
3243    /// and MSC4308) into account, and the server enabled the experimental
3244    /// feature flag for it.
3245    ///
3246    /// This may cause filtering out of thread subscriptions, and loading the
3247    /// thread subscriptions via the sliding sync extension, when the room
3248    /// list service is being used.
3249    ///
3250    /// This is async and fallible as it may use the network to retrieve the
3251    /// server supported features, if they aren't cached already.
3252    pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3253        // Check if the client is configured to support thread subscriptions first.
3254        match self.base_client().threading_support {
3255            ThreadingSupport::Enabled { with_subscriptions: false }
3256            | ThreadingSupport::Disabled => return Ok(false),
3257            ThreadingSupport::Enabled { with_subscriptions: true } => {}
3258        }
3259
3260        // Now, let's check that the server supports it.
3261        let server_enabled = self
3262            .supported_versions()
3263            .await?
3264            .features
3265            .contains(&FeatureFlag::from("org.matrix.msc4306"));
3266
3267        Ok(server_enabled)
3268    }
3269
3270    /// Fetch thread subscriptions changes between `from` and up to `to`.
3271    ///
3272    /// The `limit` optional parameter can be used to limit the number of
3273    /// entries in a response. It can also be overridden by the server, if
3274    /// it's deemed too large.
3275    pub async fn fetch_thread_subscriptions(
3276        &self,
3277        from: Option<String>,
3278        to: Option<String>,
3279        limit: Option<UInt>,
3280    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3281        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3282            from,
3283            to,
3284            limit,
3285        });
3286        Ok(self.send(request).await?)
3287    }
3288
3289    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3290        self.inner.thread_subscription_catchup.get().unwrap()
3291    }
3292
3293    /// Perform database optimizations if any are available, i.e. vacuuming in
3294    /// SQLite.
3295    ///
3296    /// **Warning:** this was added to check if SQLite fragmentation was the
3297    /// source of performance issues, **DO NOT use in production**.
3298    #[doc(hidden)]
3299    pub async fn optimize_stores(&self) -> Result<()> {
3300        trace!("Optimizing state store...");
3301        self.state_store().optimize().await?;
3302
3303        trace!("Optimizing event cache store...");
3304        if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3305            clean_lock.optimize().await?;
3306        }
3307
3308        trace!("Optimizing media store...");
3309        self.media_store().lock().await?.optimize().await?;
3310
3311        Ok(())
3312    }
3313
3314    /// Returns the sizes of the existing stores, if known.
3315    pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3316        #[cfg(feature = "e2e-encryption")]
3317        let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3318            && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3319        {
3320            Some(store_size)
3321        } else {
3322            None
3323        };
3324        #[cfg(not(feature = "e2e-encryption"))]
3325        let crypto_store_size = None;
3326
3327        let state_store_size = self.state_store().get_size().await.ok().flatten();
3328
3329        let event_cache_store_size = if let Some(clean_lock) =
3330            self.event_cache_store().lock().await?.as_clean()
3331            && let Ok(Some(store_size)) = clean_lock.get_size().await
3332        {
3333            Some(store_size)
3334        } else {
3335            None
3336        };
3337
3338        let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3339
3340        Ok(StoreSizes {
3341            crypto_store: crypto_store_size,
3342            state_store: state_store_size,
3343            event_cache_store: event_cache_store_size,
3344            media_store: media_store_size,
3345        })
3346    }
3347
3348    /// Get a reference to the client's task monitor, for spawning background
3349    /// tasks.
3350    pub fn task_monitor(&self) -> &TaskMonitor {
3351        &self.inner.task_monitor
3352    }
3353
3354    /// Add a subscriber for duplicate key upload error notifications triggered
3355    /// by requests to /keys/upload.
3356    #[cfg(feature = "e2e-encryption")]
3357    pub fn subscribe_to_duplicate_key_upload_errors(
3358        &self,
3359    ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3360        self.inner.duplicate_key_upload_error_sender.subscribe()
3361    }
3362
3363    /// Check the record of whether we are waiting for an [MSC4268] key bundle
3364    /// for the given room.
3365    ///
3366    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3367    #[cfg(feature = "e2e-encryption")]
3368    pub async fn get_pending_key_bundle_details_for_room(
3369        &self,
3370        room_id: &RoomId,
3371    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3372        Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3373    }
3374}
3375
3376/// Contains the disk size of the different stores, if known. It won't be
3377/// available for in-memory stores.
3378#[derive(Debug, Clone)]
3379pub struct StoreSizes {
3380    /// The size of the CryptoStore.
3381    pub crypto_store: Option<usize>,
3382    /// The size of the StateStore.
3383    pub state_store: Option<usize>,
3384    /// The size of the EventCacheStore.
3385    pub event_cache_store: Option<usize>,
3386    /// The size of the MediaStore.
3387    pub media_store: Option<usize>,
3388}
3389
3390#[cfg(any(feature = "testing", test))]
3391impl Client {
3392    /// Test helper to mark users as tracked by the crypto layer.
3393    #[cfg(feature = "e2e-encryption")]
3394    pub async fn update_tracked_users_for_testing(
3395        &self,
3396        user_ids: impl IntoIterator<Item = &UserId>,
3397    ) {
3398        let olm = self.olm_machine().await;
3399        let olm = olm.as_ref().unwrap();
3400        olm.update_tracked_users(user_ids).await.unwrap();
3401    }
3402}
3403
3404/// A weak reference to the inner client, useful when trying to get a handle
3405/// on the owning client.
3406#[derive(Clone, Debug)]
3407pub(crate) struct WeakClient {
3408    client: Weak<ClientInner>,
3409}
3410
3411impl WeakClient {
3412    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3413    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3414        Self { client: Arc::downgrade(client) }
3415    }
3416
3417    /// Construct a [`WeakClient`] from a [`Client`].
3418    pub fn from_client(client: &Client) -> Self {
3419        Self::from_inner(&client.inner)
3420    }
3421
3422    /// Attempts to get a [`Client`] from this [`WeakClient`].
3423    pub fn get(&self) -> Option<Client> {
3424        self.client.upgrade().map(|inner| Client { inner })
3425    }
3426
3427    /// Gets the number of strong (`Arc`) pointers still pointing to this
3428    /// client.
3429    #[allow(dead_code)]
3430    pub fn strong_count(&self) -> usize {
3431        self.client.strong_count()
3432    }
3433}
3434
3435/// Information about the state of a room before we joined it.
3436#[derive(Debug, Clone, Default)]
3437struct PreJoinRoomInfo {
3438    /// The user who invited us to the room, if any.
3439    pub inviter: Option<RoomMember>,
3440}
3441
3442// The http mocking library is not supported for wasm32
3443#[cfg(all(test, not(target_family = "wasm")))]
3444pub(crate) mod tests {
3445    use std::{sync::Arc, time::Duration};
3446
3447    use assert_matches::assert_matches;
3448    use assert_matches2::assert_let;
3449    use eyeball::SharedObservable;
3450    use futures_util::{FutureExt, StreamExt, pin_mut};
3451    use js_int::{UInt, uint};
3452    use matrix_sdk_base::{
3453        RoomState,
3454        store::{MemoryStore, StoreConfig},
3455    };
3456    use matrix_sdk_test::{
3457        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3458        event_factory::EventFactory,
3459    };
3460    #[cfg(target_family = "wasm")]
3461    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3462
3463    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3464    use ruma::{
3465        RoomId, ServerName, UserId,
3466        api::{
3467            FeatureFlag, MatrixVersion,
3468            client::{
3469                discovery::discover_homeserver::RtcFocusInfo,
3470                room::create_room::v3::Request as CreateRoomRequest,
3471            },
3472        },
3473        assign,
3474        events::{
3475            ignored_user_list::IgnoredUserListEventContent,
3476            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3477        },
3478        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3479    };
3480    use serde_json::json;
3481    use stream_assert::{assert_next_matches, assert_pending};
3482    use tokio::{
3483        spawn,
3484        time::{sleep, timeout},
3485    };
3486    use url::Url;
3487
3488    use super::Client;
3489    use crate::{
3490        Error, TransmissionProgress,
3491        client::{WeakClient, futures::SendMediaUploadRequest},
3492        config::{RequestConfig, SyncSettings},
3493        futures::SendRequest,
3494        media::MediaError,
3495        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3496    };
3497
3498    #[async_test]
3499    async fn test_account_data() {
3500        let server = MatrixMockServer::new().await;
3501        let client = server.client_builder().build().await;
3502
3503        let f = EventFactory::new();
3504        server
3505            .mock_sync()
3506            .ok_and_run(&client, |builder| {
3507                builder.add_global_account_data(
3508                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3509                );
3510            })
3511            .await;
3512
3513        let content = client
3514            .account()
3515            .account_data::<IgnoredUserListEventContent>()
3516            .await
3517            .unwrap()
3518            .unwrap()
3519            .deserialize()
3520            .unwrap();
3521
3522        assert_eq!(content.ignored_users.len(), 1);
3523    }
3524
3525    #[async_test]
3526    async fn test_successful_discovery() {
3527        // Imagine this is `matrix.org`.
3528        let server = MatrixMockServer::new().await;
3529        let server_url = server.uri();
3530
3531        // Imagine this is `matrix-client.matrix.org`.
3532        let homeserver = MatrixMockServer::new().await;
3533        let homeserver_url = homeserver.uri();
3534
3535        // Imagine Alice has the user ID `@alice:matrix.org`.
3536        let domain = server_url.strip_prefix("http://").unwrap();
3537        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3538
3539        // The `.well-known` is on the server (e.g. `matrix.org`).
3540        server
3541            .mock_well_known()
3542            .ok_with_homeserver_url(&homeserver_url)
3543            .mock_once()
3544            .named("well-known")
3545            .mount()
3546            .await;
3547
3548        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3549        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3550
3551        let client = Client::builder()
3552            .insecure_server_name_no_tls(alice.server_name())
3553            .build()
3554            .await
3555            .unwrap();
3556
3557        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3558        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3559        client.server_versions().await.unwrap();
3560    }
3561
3562    #[async_test]
3563    async fn test_discovery_broken_server() {
3564        let server = MatrixMockServer::new().await;
3565        let server_url = server.uri();
3566        let domain = server_url.strip_prefix("http://").unwrap();
3567        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3568
3569        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3570
3571        assert!(
3572            Client::builder()
3573                .insecure_server_name_no_tls(alice.server_name())
3574                .build()
3575                .await
3576                .is_err(),
3577            "Creating a client from a user ID should fail when the .well-known request fails."
3578        );
3579    }
3580
3581    #[async_test]
3582    async fn test_room_creation() {
3583        let server = MatrixMockServer::new().await;
3584        let client = server.client_builder().build().await;
3585
3586        let f = EventFactory::new().sender(user_id!("@example:localhost"));
3587        server
3588            .mock_sync()
3589            .ok_and_run(&client, |builder| {
3590                builder.add_joined_room(
3591                    JoinedRoomBuilder::default()
3592                        .add_state_event(
3593                            f.member(user_id!("@example:localhost")).display_name("example"),
3594                        )
3595                        .add_state_event(f.default_power_levels()),
3596                );
3597            })
3598            .await;
3599
3600        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3601        assert_eq!(room.state(), RoomState::Joined);
3602    }
3603
3604    #[async_test]
3605    async fn test_retry_limit_http_requests() {
3606        let server = MatrixMockServer::new().await;
3607        let client = server
3608            .client_builder()
3609            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3610            .build()
3611            .await;
3612
3613        assert!(client.request_config().retry_limit.unwrap() == 4);
3614
3615        server.mock_who_am_i().error500().expect(4).mount().await;
3616
3617        client.whoami().await.unwrap_err();
3618    }
3619
3620    #[async_test]
3621    async fn test_retry_timeout_http_requests() {
3622        // Keep this timeout small so that the test doesn't take long
3623        let retry_timeout = Duration::from_secs(5);
3624        let server = MatrixMockServer::new().await;
3625        let client = server
3626            .client_builder()
3627            .on_builder(|builder| {
3628                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3629            })
3630            .build()
3631            .await;
3632
3633        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3634
3635        server.mock_login().error500().expect(2..).mount().await;
3636
3637        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3638    }
3639
3640    #[async_test]
3641    async fn test_short_retry_initial_http_requests() {
3642        let server = MatrixMockServer::new().await;
3643        let client = server
3644            .client_builder()
3645            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3646            .build()
3647            .await;
3648
3649        server.mock_login().error500().expect(3..).mount().await;
3650
3651        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3652    }
3653
3654    #[async_test]
3655    async fn test_no_retry_http_requests() {
3656        let server = MatrixMockServer::new().await;
3657        let client = server.client_builder().build().await;
3658
3659        server.mock_devices().error500().mock_once().mount().await;
3660
3661        client.devices().await.unwrap_err();
3662    }
3663
3664    #[async_test]
3665    async fn test_set_homeserver() {
3666        let client = MockClientBuilder::new(None).build().await;
3667        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3668
3669        let homeserver = Url::parse("http://example.com/").unwrap();
3670        client.set_homeserver(homeserver.clone());
3671        assert_eq!(client.homeserver(), homeserver);
3672    }
3673
3674    #[async_test]
3675    async fn test_search_user_request() {
3676        let server = MatrixMockServer::new().await;
3677        let client = server.client_builder().build().await;
3678
3679        server.mock_user_directory().ok().mock_once().mount().await;
3680
3681        let response = client.search_users("test", 50).await.unwrap();
3682        assert_eq!(response.results.len(), 1);
3683        let result = &response.results[0];
3684        assert_eq!(result.user_id.to_string(), "@test:example.me");
3685        assert_eq!(result.display_name.clone().unwrap(), "Test");
3686        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3687        assert!(!response.limited);
3688    }
3689
3690    #[async_test]
3691    async fn test_request_unstable_features() {
3692        let server = MatrixMockServer::new().await;
3693        let client = server.client_builder().no_server_versions().build().await;
3694
3695        server
3696            .mock_versions()
3697            .with_feature("org.matrix.e2e_cross_signing", true)
3698            .ok()
3699            .mock_once()
3700            .mount()
3701            .await;
3702
3703        let unstable_features = client.unstable_features().await.unwrap();
3704        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3705        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3706    }
3707
3708    #[async_test]
3709    async fn test_can_homeserver_push_encrypted_event_to_device() {
3710        let server = MatrixMockServer::new().await;
3711        let client = server.client_builder().no_server_versions().build().await;
3712
3713        server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3714
3715        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3716        assert!(msc4028_enabled);
3717    }
3718
3719    #[async_test]
3720    async fn test_recently_visited_rooms() {
3721        // Tracking recently visited rooms requires authentication
3722        let client = MockClientBuilder::new(None).unlogged().build().await;
3723        assert_matches!(
3724            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3725            Err(Error::AuthenticationRequired)
3726        );
3727
3728        let client = MockClientBuilder::new(None).build().await;
3729        let account = client.account();
3730
3731        // We should start off with an empty list
3732        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3733
3734        // Tracking a valid room id should add it to the list
3735        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3736        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3737        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3738
3739        // And the existing list shouldn't be changed
3740        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3741        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3742
3743        // Tracking the same room again shouldn't change the list
3744        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3745        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3746        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3747
3748        // Tracking a second room should add it to the front of the list
3749        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3750        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3751        assert_eq!(
3752            account.get_recently_visited_rooms().await.unwrap(),
3753            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3754        );
3755
3756        // Tracking the first room yet again should move it to the front of the list
3757        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3758        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3759        assert_eq!(
3760            account.get_recently_visited_rooms().await.unwrap(),
3761            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3762        );
3763
3764        // Tracking should be capped at 20
3765        for n in 0..20 {
3766            account
3767                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3768                .await
3769                .unwrap();
3770        }
3771
3772        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3773
3774        // And the initial rooms should've been pushed out
3775        let rooms = account.get_recently_visited_rooms().await.unwrap();
3776        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3777        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3778
3779        // And the last tracked room should be the first
3780        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3781    }
3782
3783    #[async_test]
3784    async fn test_client_no_cycle_with_event_cache() {
3785        let client = MockClientBuilder::new(None).build().await;
3786
3787        // Wait for the init tasks to die.
3788        sleep(Duration::from_secs(1)).await;
3789
3790        let weak_client = WeakClient::from_client(&client);
3791        assert_eq!(weak_client.strong_count(), 1);
3792
3793        {
3794            let room_id = room_id!("!room:example.org");
3795
3796            // Have the client know the room.
3797            let response = SyncResponseBuilder::default()
3798                .add_joined_room(JoinedRoomBuilder::new(room_id))
3799                .build_sync_response();
3800            client.inner.base_client.receive_sync_response(response).await.unwrap();
3801
3802            client.event_cache().subscribe().unwrap();
3803
3804            let (_room_event_cache, _drop_handles) =
3805                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3806        }
3807
3808        drop(client);
3809
3810        // Give a bit of time for background tasks to die.
3811        sleep(Duration::from_secs(1)).await;
3812
3813        // The weak client must be the last reference to the client now.
3814        assert_eq!(weak_client.strong_count(), 0);
3815        let client = weak_client.get();
3816        assert!(
3817            client.is_none(),
3818            "too many strong references to the client: {}",
3819            Arc::strong_count(&client.unwrap().inner)
3820        );
3821    }
3822
3823    #[async_test]
3824    async fn test_supported_versions_caching() {
3825        let server = MatrixMockServer::new().await;
3826
3827        let versions_mock = server
3828            .mock_versions()
3829            .expect_default_access_token()
3830            .with_feature("org.matrix.e2e_cross_signing", true)
3831            .ok()
3832            .named("first versions mock")
3833            .expect(1)
3834            .mount_as_scoped()
3835            .await;
3836
3837        let memory_store = Arc::new(MemoryStore::new());
3838        let client = server
3839            .client_builder()
3840            .no_server_versions()
3841            .on_builder(|builder| {
3842                builder.store_config(
3843                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3844                        .state_store(memory_store.clone()),
3845                )
3846            })
3847            .build()
3848            .await;
3849
3850        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3851
3852        // The result was cached.
3853        assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3854        // This subsequent call hits the in-memory cache.
3855        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3856
3857        drop(client);
3858
3859        let client = server
3860            .client_builder()
3861            .no_server_versions()
3862            .on_builder(|builder| {
3863                builder.store_config(
3864                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3865                        .state_store(memory_store.clone()),
3866                )
3867            })
3868            .build()
3869            .await;
3870
3871        // These calls to the new client hit the on-disk cache.
3872        assert!(
3873            client
3874                .unstable_features()
3875                .await
3876                .unwrap()
3877                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3878        );
3879        let supported = client.supported_versions().await.unwrap();
3880        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3881        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3882
3883        // Then this call hits the in-memory cache.
3884        let supported = client.supported_versions().await.unwrap();
3885        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3886        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3887
3888        drop(versions_mock);
3889
3890        // Now, reset the cache, and observe the endpoints being called again once.
3891        client.reset_supported_versions().await.unwrap();
3892
3893        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3894
3895        // Hits network again.
3896        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3897        // Hits in-memory cache again.
3898        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3899    }
3900
3901    #[async_test]
3902    async fn test_well_known_caching() {
3903        let server = MatrixMockServer::new().await;
3904        let server_url = server.uri();
3905        let domain = server_url.strip_prefix("http://").unwrap();
3906        let server_name = <&ServerName>::try_from(domain).unwrap();
3907        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3908
3909        let well_known_mock = server
3910            .mock_well_known()
3911            .ok()
3912            .named("well known mock")
3913            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3914            .mount_as_scoped()
3915            .await;
3916
3917        let memory_store = Arc::new(MemoryStore::new());
3918        let client = Client::builder()
3919            .insecure_server_name_no_tls(server_name)
3920            .store_config(
3921                StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3922                    .state_store(memory_store.clone()),
3923            )
3924            .build()
3925            .await
3926            .unwrap();
3927
3928        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3929
3930        // This subsequent call hits the in-memory cache.
3931        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3932
3933        drop(client);
3934
3935        let client = server
3936            .client_builder()
3937            .no_server_versions()
3938            .on_builder(|builder| {
3939                builder.store_config(
3940                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3941                        .state_store(memory_store.clone()),
3942                )
3943            })
3944            .build()
3945            .await;
3946
3947        // This call to the new client hits the on-disk cache.
3948        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3949
3950        // Then this call hits the in-memory cache.
3951        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3952
3953        drop(well_known_mock);
3954
3955        // Now, reset the cache, and observe the endpoints being called again once.
3956        client.reset_well_known().await.unwrap();
3957
3958        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3959
3960        // Hits network again.
3961        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3962        // Hits in-memory cache again.
3963        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3964    }
3965
3966    #[async_test]
3967    async fn test_missing_well_known_caching() {
3968        let server = MatrixMockServer::new().await;
3969        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3970
3971        let well_known_mock = server
3972            .mock_well_known()
3973            .error_unrecognized()
3974            .named("first well-known mock")
3975            .expect(1)
3976            .mount_as_scoped()
3977            .await;
3978
3979        let memory_store = Arc::new(MemoryStore::new());
3980        let client = server
3981            .client_builder()
3982            .on_builder(|builder| {
3983                builder.store_config(
3984                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3985                        .state_store(memory_store.clone()),
3986                )
3987            })
3988            .build()
3989            .await;
3990
3991        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3992
3993        // This subsequent call hits the in-memory cache.
3994        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3995
3996        drop(client);
3997
3998        let client = server
3999            .client_builder()
4000            .on_builder(|builder| {
4001                builder.store_config(
4002                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4003                        .state_store(memory_store.clone()),
4004                )
4005            })
4006            .build()
4007            .await;
4008
4009        // This call to the new client hits the on-disk cache.
4010        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4011
4012        // Then this call hits the in-memory cache.
4013        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4014
4015        drop(well_known_mock);
4016
4017        // Now, reset the cache, and observe the endpoints being called again once.
4018        client.reset_well_known().await.unwrap();
4019
4020        server
4021            .mock_well_known()
4022            .error_unrecognized()
4023            .expect(1)
4024            .named("second well-known mock")
4025            .mount()
4026            .await;
4027
4028        // Hits network again.
4029        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4030        // Hits in-memory cache again.
4031        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4032    }
4033
4034    #[async_test]
4035    async fn test_no_network_doesnt_cause_infinite_retries() {
4036        // We want infinite retries for transient errors.
4037        let client = MockClientBuilder::new(None)
4038            .on_builder(|builder| builder.request_config(RequestConfig::new()))
4039            .build()
4040            .await;
4041
4042        // We don't define a mock server on purpose here, so that the error is really a
4043        // network error.
4044        client.whoami().await.unwrap_err();
4045    }
4046
4047    #[async_test]
4048    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4049        let server = MatrixMockServer::new().await;
4050        let client = server.client_builder().build().await;
4051
4052        let room_id = room_id!("!room:example.org");
4053
4054        server
4055            .mock_sync()
4056            .ok_and_run(&client, |builder| {
4057                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4058            })
4059            .await;
4060
4061        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4062        assert_eq!(room.room_id(), room_id);
4063    }
4064
4065    #[async_test]
4066    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4067        let server = MatrixMockServer::new().await;
4068        let client = server.client_builder().build().await;
4069
4070        let room_id = room_id!("!room:example.org");
4071
4072        let client = Arc::new(client);
4073
4074        // Perform the /sync request with a delay so it starts after the
4075        // `await_room_remote_echo` call has happened
4076        spawn({
4077            let client = client.clone();
4078            async move {
4079                sleep(Duration::from_millis(100)).await;
4080
4081                server
4082                    .mock_sync()
4083                    .ok_and_run(&client, |builder| {
4084                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4085                    })
4086                    .await;
4087            }
4088        });
4089
4090        let room =
4091            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4092        assert_eq!(room.room_id(), room_id);
4093    }
4094
4095    #[async_test]
4096    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4097        let client = MockClientBuilder::new(None).build().await;
4098
4099        let room_id = room_id!("!room:example.org");
4100        // Room is not present so the client won't be able to find it. The call will
4101        // timeout.
4102        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4103    }
4104
4105    #[async_test]
4106    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4107        let server = MatrixMockServer::new().await;
4108        let client = server.client_builder().build().await;
4109
4110        server.mock_create_room().ok().mount().await;
4111
4112        // Create a room in the internal store
4113        let room = client
4114            .create_room(assign!(CreateRoomRequest::new(), {
4115                invite: vec![],
4116                is_direct: false,
4117            }))
4118            .await
4119            .unwrap();
4120
4121        // Room is locally present, but not synced, the call will timeout
4122        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4123            .await
4124            .unwrap_err();
4125    }
4126
4127    #[async_test]
4128    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4129        let server = MatrixMockServer::new().await;
4130        let client = server.client_builder().build().await;
4131
4132        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4133
4134        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4135        assert_matches!(ret, Ok(true));
4136    }
4137
4138    #[async_test]
4139    async fn test_is_room_alias_available_if_alias_is_resolved() {
4140        let server = MatrixMockServer::new().await;
4141        let client = server.client_builder().build().await;
4142
4143        server
4144            .mock_room_directory_resolve_alias()
4145            .ok("!some_room_id:matrix.org", Vec::new())
4146            .expect(1)
4147            .mount()
4148            .await;
4149
4150        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4151        assert_matches!(ret, Ok(false));
4152    }
4153
4154    #[async_test]
4155    async fn test_is_room_alias_available_if_error_found() {
4156        let server = MatrixMockServer::new().await;
4157        let client = server.client_builder().build().await;
4158
4159        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4160
4161        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4162        assert_matches!(ret, Err(_));
4163    }
4164
4165    #[async_test]
4166    async fn test_create_room_alias() {
4167        let server = MatrixMockServer::new().await;
4168        let client = server.client_builder().build().await;
4169
4170        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4171
4172        let ret = client
4173            .create_room_alias(
4174                room_alias_id!("#some_alias:matrix.org"),
4175                room_id!("!some_room:matrix.org"),
4176            )
4177            .await;
4178        assert_matches!(ret, Ok(()));
4179    }
4180
4181    #[async_test]
4182    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4183        let server = MatrixMockServer::new().await;
4184        let client = server.client_builder().build().await;
4185
4186        let room_id = room_id!("!a-room:matrix.org");
4187
4188        // Make sure the summary endpoint is called once
4189        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4190
4191        // We create a locally cached invited room
4192        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4193
4194        // And we get a preview, the server endpoint was reached
4195        let preview = client
4196            .get_room_preview(room_id.into(), Vec::new())
4197            .await
4198            .expect("Room preview should be retrieved");
4199
4200        assert_eq!(invited_room.room_id(), preview.room_id);
4201    }
4202
4203    #[async_test]
4204    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4205        let server = MatrixMockServer::new().await;
4206        let client = server.client_builder().build().await;
4207
4208        let room_id = room_id!("!a-room:matrix.org");
4209
4210        // Make sure the summary endpoint is called once
4211        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4212
4213        // We create a locally cached left room
4214        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4215
4216        // And we get a preview, the server endpoint was reached
4217        let preview = client
4218            .get_room_preview(room_id.into(), Vec::new())
4219            .await
4220            .expect("Room preview should be retrieved");
4221
4222        assert_eq!(left_room.room_id(), preview.room_id);
4223    }
4224
4225    #[async_test]
4226    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4227        let server = MatrixMockServer::new().await;
4228        let client = server.client_builder().build().await;
4229
4230        let room_id = room_id!("!a-room:matrix.org");
4231
4232        // Make sure the summary endpoint is called once
4233        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4234
4235        // We create a locally cached knocked room
4236        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4237
4238        // And we get a preview, the server endpoint was reached
4239        let preview = client
4240            .get_room_preview(room_id.into(), Vec::new())
4241            .await
4242            .expect("Room preview should be retrieved");
4243
4244        assert_eq!(knocked_room.room_id(), preview.room_id);
4245    }
4246
4247    #[async_test]
4248    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4249        let server = MatrixMockServer::new().await;
4250        let client = server.client_builder().build().await;
4251
4252        let room_id = room_id!("!a-room:matrix.org");
4253
4254        // Make sure the summary endpoint is not called
4255        server.mock_room_summary().ok(room_id).never().mount().await;
4256
4257        // We create a locally cached joined room
4258        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4259
4260        // And we get a preview, no server endpoint was reached
4261        let preview = client
4262            .get_room_preview(room_id.into(), Vec::new())
4263            .await
4264            .expect("Room preview should be retrieved");
4265
4266        assert_eq!(joined_room.room_id(), preview.room_id);
4267    }
4268
4269    #[async_test]
4270    async fn test_media_preview_config() {
4271        let server = MatrixMockServer::new().await;
4272        let client = server.client_builder().build().await;
4273
4274        server
4275            .mock_sync()
4276            .ok_and_run(&client, |builder| {
4277                builder.add_custom_global_account_data(json!({
4278                    "content": {
4279                        "media_previews": "private",
4280                        "invite_avatars": "off"
4281                    },
4282                    "type": "m.media_preview_config"
4283                }));
4284            })
4285            .await;
4286
4287        let (initial_value, stream) =
4288            client.account().observe_media_preview_config().await.unwrap();
4289
4290        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4291        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4292        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4293        pin_mut!(stream);
4294        assert_pending!(stream);
4295
4296        server
4297            .mock_sync()
4298            .ok_and_run(&client, |builder| {
4299                builder.add_custom_global_account_data(json!({
4300                    "content": {
4301                        "media_previews": "off",
4302                        "invite_avatars": "on"
4303                    },
4304                    "type": "m.media_preview_config"
4305                }));
4306            })
4307            .await;
4308
4309        assert_next_matches!(
4310            stream,
4311            MediaPreviewConfigEventContent {
4312                media_previews: Some(MediaPreviews::Off),
4313                invite_avatars: Some(InviteAvatars::On),
4314                ..
4315            }
4316        );
4317        assert_pending!(stream);
4318    }
4319
4320    #[async_test]
4321    async fn test_unstable_media_preview_config() {
4322        let server = MatrixMockServer::new().await;
4323        let client = server.client_builder().build().await;
4324
4325        server
4326            .mock_sync()
4327            .ok_and_run(&client, |builder| {
4328                builder.add_custom_global_account_data(json!({
4329                    "content": {
4330                        "media_previews": "private",
4331                        "invite_avatars": "off"
4332                    },
4333                    "type": "io.element.msc4278.media_preview_config"
4334                }));
4335            })
4336            .await;
4337
4338        let (initial_value, stream) =
4339            client.account().observe_media_preview_config().await.unwrap();
4340
4341        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4342        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4343        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4344        pin_mut!(stream);
4345        assert_pending!(stream);
4346
4347        server
4348            .mock_sync()
4349            .ok_and_run(&client, |builder| {
4350                builder.add_custom_global_account_data(json!({
4351                    "content": {
4352                        "media_previews": "off",
4353                        "invite_avatars": "on"
4354                    },
4355                    "type": "io.element.msc4278.media_preview_config"
4356                }));
4357            })
4358            .await;
4359
4360        assert_next_matches!(
4361            stream,
4362            MediaPreviewConfigEventContent {
4363                media_previews: Some(MediaPreviews::Off),
4364                invite_avatars: Some(InviteAvatars::On),
4365                ..
4366            }
4367        );
4368        assert_pending!(stream);
4369    }
4370
4371    #[async_test]
4372    async fn test_media_preview_config_not_found() {
4373        let server = MatrixMockServer::new().await;
4374        let client = server.client_builder().build().await;
4375
4376        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4377
4378        assert!(initial_value.is_none());
4379    }
4380
4381    #[async_test]
4382    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4383        // The default Matrix version we use is 1.11 or higher, so authenticated media
4384        // is supported.
4385        let server = MatrixMockServer::new().await;
4386        let client = server.client_builder().build().await;
4387
4388        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4389
4390        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4391        client.load_or_fetch_max_upload_size().await.unwrap();
4392
4393        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4394    }
4395
4396    #[async_test]
4397    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4398        // The server must advertise support for the stable feature for authenticated
4399        // media support, so we mock the `GET /versions` response.
4400        let server = MatrixMockServer::new().await;
4401        let client = server.client_builder().no_server_versions().build().await;
4402
4403        server
4404            .mock_versions()
4405            .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4406            .with_feature("org.matrix.msc3916.stable", true)
4407            .ok()
4408            .named("versions")
4409            .expect(1)
4410            .mount()
4411            .await;
4412
4413        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4414
4415        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4416        client.load_or_fetch_max_upload_size().await.unwrap();
4417
4418        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4419    }
4420
4421    #[async_test]
4422    async fn test_load_or_fetch_max_upload_size_no_auth() {
4423        // The server must not support Matrix 1.11 or higher for unauthenticated
4424        // media requests, so we mock the `GET /versions` response.
4425        let server = MatrixMockServer::new().await;
4426        let client = server.client_builder().no_server_versions().build().await;
4427
4428        server
4429            .mock_versions()
4430            .with_versions(vec!["v1.1"])
4431            .ok()
4432            .named("versions")
4433            .expect(1)
4434            .mount()
4435            .await;
4436
4437        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4438
4439        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4440        client.load_or_fetch_max_upload_size().await.unwrap();
4441
4442        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4443    }
4444
4445    #[async_test]
4446    async fn test_uploading_a_too_large_media_file() {
4447        let server = MatrixMockServer::new().await;
4448        let client = server.client_builder().build().await;
4449
4450        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4451        client.load_or_fetch_max_upload_size().await.unwrap();
4452        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4453
4454        let data = vec![1, 2];
4455        let upload_request =
4456            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4457        let request = SendRequest {
4458            client: client.clone(),
4459            request: upload_request,
4460            config: None,
4461            send_progress: SharedObservable::new(TransmissionProgress::default()),
4462        };
4463        let media_request = SendMediaUploadRequest::new(request);
4464
4465        let error = media_request.await.err();
4466        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4467        assert_eq!(max, uint!(1));
4468        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4469    }
4470
4471    #[async_test]
4472    async fn test_dont_ignore_timeout_on_first_sync() {
4473        let server = MatrixMockServer::new().await;
4474        let client = server.client_builder().build().await;
4475
4476        server
4477            .mock_sync()
4478            .timeout(Some(Duration::from_secs(30)))
4479            .ok(|_| {})
4480            .mock_once()
4481            .named("sync_with_timeout")
4482            .mount()
4483            .await;
4484
4485        // Call the endpoint once to check the timeout.
4486        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4487
4488        timeout(Duration::from_secs(1), async {
4489            stream.next().await.unwrap().unwrap();
4490        })
4491        .await
4492        .unwrap();
4493    }
4494
4495    #[async_test]
4496    async fn test_ignore_timeout_on_first_sync() {
4497        let server = MatrixMockServer::new().await;
4498        let client = server.client_builder().build().await;
4499
4500        server
4501            .mock_sync()
4502            .timeout(None)
4503            .ok(|_| {})
4504            .mock_once()
4505            .named("sync_no_timeout")
4506            .mount()
4507            .await;
4508        server
4509            .mock_sync()
4510            .timeout(Some(Duration::from_secs(30)))
4511            .ok(|_| {})
4512            .mock_once()
4513            .named("sync_with_timeout")
4514            .mount()
4515            .await;
4516
4517        // Call each version of the endpoint once to check the timeouts.
4518        let mut stream = Box::pin(
4519            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4520        );
4521
4522        timeout(Duration::from_secs(1), async {
4523            stream.next().await.unwrap().unwrap();
4524            stream.next().await.unwrap().unwrap();
4525        })
4526        .await
4527        .unwrap();
4528    }
4529
4530    #[async_test]
4531    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4532        let server = MatrixMockServer::new().await;
4533        let client = server.client_builder().build().await;
4534        // This is the user ID that is inside MemberAdditional.
4535        // Note the confusing username, so we can share
4536        // GlobalAccountDataTestEvent::Direct with the invited test.
4537        let user_id = user_id!("@invited:localhost");
4538
4539        // When we receive a sync response saying "invited" is invited to a DM
4540        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4541        let response = SyncResponseBuilder::default()
4542            .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4543            .add_global_account_data(
4544                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4545            )
4546            .build_sync_response();
4547        client.base_client().receive_sync_response(response).await.unwrap();
4548
4549        // Then get_dm_room finds this room
4550        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4551        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4552    }
4553
4554    #[async_test]
4555    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4556        let server = MatrixMockServer::new().await;
4557        let client = server.client_builder().build().await;
4558        // This is the user ID that is inside MemberInvite
4559        let user_id = user_id!("@invited:localhost");
4560
4561        // When we receive a sync response saying "invited" is invited to a DM
4562        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4563        let response = SyncResponseBuilder::default()
4564            .add_joined_room(
4565                JoinedRoomBuilder::default()
4566                    .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4567            )
4568            .add_global_account_data(
4569                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4570            )
4571            .build_sync_response();
4572        client.base_client().receive_sync_response(response).await.unwrap();
4573
4574        // Then get_dm_room finds this room
4575        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4576        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4577    }
4578
4579    #[async_test]
4580    async fn test_get_dm_room_still_finds_left_room() {
4581        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4582        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4583
4584        let server = MatrixMockServer::new().await;
4585        let client = server.client_builder().build().await;
4586        // This is the user ID that is inside MemberAdditional.
4587        // Note the confusing username, so we can share
4588        // GlobalAccountDataTestEvent::Direct with the invited test.
4589        let user_id = user_id!("@invited:localhost");
4590
4591        // When we receive a sync response saying "invited" has left a DM
4592        let f = EventFactory::new().sender(user_id);
4593        let response = SyncResponseBuilder::default()
4594            .add_joined_room(
4595                JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4596            )
4597            .add_global_account_data(
4598                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4599            )
4600            .build_sync_response();
4601        client.base_client().receive_sync_response(response).await.unwrap();
4602
4603        // Then get_dm_room finds this room
4604        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4605        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4606    }
4607
4608    #[async_test]
4609    async fn test_device_exists() {
4610        let server = MatrixMockServer::new().await;
4611        let client = server.client_builder().build().await;
4612
4613        server.mock_get_device().ok().expect(1).mount().await;
4614
4615        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4616    }
4617
4618    #[async_test]
4619    async fn test_device_exists_404() {
4620        let server = MatrixMockServer::new().await;
4621        let client = server.client_builder().build().await;
4622
4623        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4624    }
4625
4626    #[async_test]
4627    async fn test_device_exists_500() {
4628        let server = MatrixMockServer::new().await;
4629        let client = server.client_builder().build().await;
4630
4631        server.mock_get_device().error500().expect(1).mount().await;
4632
4633        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4634    }
4635
4636    #[async_test]
4637    async fn test_fetching_well_known_with_homeserver_url() {
4638        let server = MatrixMockServer::new().await;
4639        let client = server.client_builder().build().await;
4640        server.mock_well_known().ok().mount().await;
4641
4642        assert_matches!(client.fetch_client_well_known().await, Some(_));
4643    }
4644
4645    #[async_test]
4646    async fn test_fetching_well_known_with_server_name() {
4647        let server = MatrixMockServer::new().await;
4648        let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4649
4650        server.mock_well_known().ok().mount().await;
4651
4652        let client = MockClientBuilder::new(None)
4653            .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4654            .build()
4655            .await;
4656
4657        assert_matches!(client.fetch_client_well_known().await, Some(_));
4658    }
4659
4660    #[async_test]
4661    async fn test_fetching_well_known_with_domain_part_of_user_id() {
4662        let server = MatrixMockServer::new().await;
4663        server.mock_well_known().ok().mount().await;
4664
4665        let user_id =
4666            UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4667        let client = MockClientBuilder::new(None)
4668            .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4669            .build()
4670            .await;
4671
4672        assert_matches!(client.fetch_client_well_known().await, Some(_));
4673    }
4674}