Skip to main content

matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32    DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
36    StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
37    event_cache::store::EventCacheStoreLock,
38    media::store::MediaStoreLock,
39    store::{
40        DynStateStore, RoomLoadSettings, SupportedVersionsResponse, TtlStoreValue,
41        WellKnownResponse,
42    },
43    sync::{Notification, RoomUpdates},
44    task_monitor::TaskMonitor,
45};
46use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl_cache::TtlCache};
47#[cfg(feature = "e2e-encryption")]
48use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
49use ruma::{
50    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
51    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
52    api::{
53        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
54        auth_scheme::{AuthScheme, SendAccessToken},
55        client::{
56            account::whoami,
57            alias::{create_alias, delete_alias, get_alias},
58            authenticated_media,
59            device::{self, delete_devices, get_devices, update_device},
60            directory::{get_public_rooms, get_public_rooms_filtered},
61            discovery::{
62                discover_homeserver::{self, RtcFocusInfo},
63                get_capabilities::{self, v3::Capabilities},
64                get_supported_versions,
65            },
66            error::ErrorKind,
67            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
68            knock::knock_room,
69            media,
70            membership::{join_room_by_id, join_room_by_id_or_alias},
71            room::create_room,
72            session::login::v3::DiscoveryInfo,
73            sync::sync_events,
74            threads::get_thread_subscriptions_changes,
75            uiaa,
76            user_directory::search_users,
77        },
78        error::FromHttpResponseError,
79        path_builder::PathBuilder,
80    },
81    assign,
82    events::direct::DirectUserIdentifier,
83    push::Ruleset,
84    time::Instant,
85};
86use serde::de::DeserializeOwned;
87use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
88use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
89use url::Url;
90
91use self::{
92    caches::{CachedValue, ClientCaches},
93    futures::SendRequest,
94};
95use crate::{
96    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
97    Room, SessionTokens, TransmissionProgress,
98    authentication::{
99        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
100        oauth::OAuth,
101    },
102    client::thread_subscriptions::ThreadSubscriptionCatchup,
103    config::{RequestConfig, SyncToken},
104    deduplicating_handler::DeduplicatingHandler,
105    error::HttpResult,
106    event_cache::EventCache,
107    event_handler::{
108        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
109        EventHandlerStore, ObservableEventHandler, SyncEvent,
110    },
111    http_client::{HttpClient, SupportedPathBuilder},
112    latest_events::LatestEvents,
113    media::MediaError,
114    notification_settings::NotificationSettings,
115    room::RoomMember,
116    room_preview::RoomPreview,
117    send_queue::{SendQueue, SendQueueData},
118    sliding_sync::Version as SlidingSyncVersion,
119    sync::{RoomUpdate, SyncResponse},
120};
121#[cfg(feature = "e2e-encryption")]
122use crate::{
123    cross_process_lock::CrossProcessLock,
124    encryption::{
125        DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
126        VerificationState,
127    },
128};
129
130mod builder;
131pub(crate) mod caches;
132pub(crate) mod futures;
133pub(crate) mod thread_subscriptions;
134
135pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
136#[cfg(feature = "experimental-search")]
137use crate::search_index::SearchIndex;
138
139#[cfg(not(target_family = "wasm"))]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
141#[cfg(target_family = "wasm")]
142type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
143
144#[cfg(not(target_family = "wasm"))]
145type NotificationHandlerFn =
146    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
147#[cfg(target_family = "wasm")]
148type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
149
150/// Enum controlling if a loop running callbacks should continue or abort.
151///
152/// This is mainly used in the [`sync_with_callback`] method, the return value
153/// of the provided callback controls if the sync loop should be exited.
154///
155/// [`sync_with_callback`]: #method.sync_with_callback
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum LoopCtrl {
158    /// Continue running the loop.
159    Continue,
160    /// Break out of the loop.
161    Break,
162}
163
164/// Represents changes that can occur to a `Client`s `Session`.
165#[derive(Debug, Clone, PartialEq)]
166pub enum SessionChange {
167    /// The session's token is no longer valid.
168    UnknownToken {
169        /// Whether or not the session was soft logged out
170        soft_logout: bool,
171    },
172    /// The session's tokens have been refreshed.
173    TokensRefreshed,
174}
175
176/// Information about the server vendor obtained from the federation API.
177#[derive(Debug, Clone, PartialEq, Eq)]
178#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
179pub struct ServerVendorInfo {
180    /// The server name.
181    pub server_name: String,
182    /// The server version.
183    pub version: String,
184}
185
186/// An async/await enabled Matrix client.
187///
188/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
189#[derive(Clone)]
190pub struct Client {
191    pub(crate) inner: Arc<ClientInner>,
192}
193
194#[derive(Default)]
195pub(crate) struct ClientLocks {
196    /// Lock ensuring that only a single room may be marked as a DM at once.
197    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
198    /// explanation.
199    pub(crate) mark_as_dm_lock: Mutex<()>,
200
201    /// Lock ensuring that only a single secret store is getting opened at the
202    /// same time.
203    ///
204    /// This is important so we don't accidentally create multiple different new
205    /// default secret storage keys.
206    #[cfg(feature = "e2e-encryption")]
207    pub(crate) open_secret_store_lock: Mutex<()>,
208
209    /// Lock ensuring that we're only storing a single secret at a time.
210    ///
211    /// Take a look at the [`SecretStore::put_secret`] method for a more
212    /// detailed explanation.
213    ///
214    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
215    #[cfg(feature = "e2e-encryption")]
216    pub(crate) store_secret_lock: Mutex<()>,
217
218    /// Lock ensuring that only one method at a time might modify our backup.
219    #[cfg(feature = "e2e-encryption")]
220    pub(crate) backup_modify_lock: Mutex<()>,
221
222    /// Lock ensuring that we're going to attempt to upload backups for a single
223    /// requester.
224    #[cfg(feature = "e2e-encryption")]
225    pub(crate) backup_upload_lock: Mutex<()>,
226
227    /// Handler making sure we only have one group session sharing request in
228    /// flight per room.
229    #[cfg(feature = "e2e-encryption")]
230    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
231
232    /// Lock making sure we're only doing one key claim request at a time.
233    #[cfg(feature = "e2e-encryption")]
234    pub(crate) key_claim_lock: Mutex<()>,
235
236    /// Handler to ensure that only one members request is running at a time,
237    /// given a room.
238    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
239
240    /// Handler to ensure that only one encryption state request is running at a
241    /// time, given a room.
242    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
243
244    /// Deduplicating handler for sending read receipts. The string is an
245    /// internal implementation detail, see [`Self::send_single_receipt`].
246    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
247
248    #[cfg(feature = "e2e-encryption")]
249    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
250
251    /// Latest "generation" of data known by the crypto store.
252    ///
253    /// This is a counter that only increments, set in the database (and can
254    /// wrap). It's incremented whenever some process acquires a lock for the
255    /// first time. *This assumes the crypto store lock is being held, to
256    /// avoid data races on writing to this value in the store*.
257    ///
258    /// The current process will maintain this value in local memory and in the
259    /// DB over time. Observing a different value than the one read in
260    /// memory, when reading from the store indicates that somebody else has
261    /// written into the database under our feet.
262    ///
263    /// TODO: this should live in the `OlmMachine`, since it's information
264    /// related to the lock. As of today (2023-07-28), we blow up the entire
265    /// olm machine when there's a generation mismatch. So storing the
266    /// generation in the olm machine would make the client think there's
267    /// *always* a mismatch, and that's why we need to store the generation
268    /// outside the `OlmMachine`.
269    #[cfg(feature = "e2e-encryption")]
270    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
271}
272
273pub(crate) struct ClientInner {
274    /// All the data related to authentication and authorization.
275    pub(crate) auth_ctx: Arc<AuthCtx>,
276
277    /// The URL of the server.
278    ///
279    /// Not to be confused with the `Self::homeserver`. `server` is usually
280    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
281    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
282    /// homeserver (at the time of writing — 2024-08-28).
283    ///
284    /// This value is optional depending on how the `Client` has been built.
285    /// If it's been built from a homeserver URL directly, we don't know the
286    /// server. However, if the `Client` has been built from a server URL or
287    /// name, then the homeserver has been discovered, and we know both.
288    server: Option<Url>,
289
290    /// The URL of the homeserver to connect to.
291    ///
292    /// This is the URL for the client-server Matrix API.
293    homeserver: StdRwLock<Url>,
294
295    /// The sliding sync version.
296    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
297
298    /// The underlying HTTP client.
299    pub(crate) http_client: HttpClient,
300
301    /// User session data.
302    pub(super) base_client: BaseClient,
303
304    /// Collection of in-memory caches for the [`Client`].
305    pub(crate) caches: ClientCaches,
306
307    /// Collection of locks individual client methods might want to use, either
308    /// to ensure that only a single call to a method happens at once or to
309    /// deduplicate multiple calls to a method.
310    pub(crate) locks: ClientLocks,
311
312    /// The cross-process lock configuration.
313    ///
314    /// The SDK provides cross-process store locks (see
315    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
316    /// [`CrossProcessLockConfig::MultiProcess`] is used.
317    ///
318    /// If multiple `Client`s are running in different processes, this
319    /// value MUST be different for each `Client`.
320    cross_process_lock_config: CrossProcessLockConfig,
321
322    /// A mapping of the times at which the current user sent typing notices,
323    /// keyed by room.
324    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
325
326    /// Event handlers. See `add_event_handler`.
327    pub(crate) event_handlers: EventHandlerStore,
328
329    /// Notification handlers. See `register_notification_handler`.
330    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
331
332    /// The sender-side of channels used to receive room updates.
333    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
334
335    /// The sender-side of a channel used to observe all the room updates of a
336    /// sync response.
337    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
338
339    /// Whether the client should update its homeserver URL with the discovery
340    /// information present in the login response.
341    respect_login_well_known: bool,
342
343    /// An event that can be listened on to wait for a successful sync. The
344    /// event will only be fired if a sync loop is running. Can be used for
345    /// synchronization, e.g. if we send out a request to create a room, we can
346    /// wait for the sync to get the data to fetch a room object from the state
347    /// store.
348    pub(crate) sync_beat: event_listener::Event,
349
350    /// A central cache for events, inactive first.
351    ///
352    /// It becomes active when [`EventCache::subscribe`] is called.
353    pub(crate) event_cache: OnceCell<EventCache>,
354
355    /// End-to-end encryption related state.
356    #[cfg(feature = "e2e-encryption")]
357    pub(crate) e2ee: EncryptionData,
358
359    /// The verification state of our own device.
360    #[cfg(feature = "e2e-encryption")]
361    pub(crate) verification_state: SharedObservable<VerificationState>,
362
363    /// Whether to enable the experimental support for sending and receiving
364    /// encrypted room history on invite, per [MSC4268].
365    ///
366    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
367    #[cfg(feature = "e2e-encryption")]
368    pub(crate) enable_share_history_on_invite: bool,
369
370    /// Data related to the [`SendQueue`].
371    ///
372    /// [`SendQueue`]: crate::send_queue::SendQueue
373    pub(crate) send_queue_data: Arc<SendQueueData>,
374
375    /// The `max_upload_size` value of the homeserver, it contains the max
376    /// request size you can send.
377    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
378
379    /// The entry point to get the [`LatestEvent`] of rooms and threads.
380    ///
381    /// [`LatestEvent`]: crate::latest_event::LatestEvent
382    latest_events: OnceCell<LatestEvents>,
383
384    /// Service handling the catching up of thread subscriptions in the
385    /// background.
386    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
387
388    #[cfg(feature = "experimental-search")]
389    /// Handler for [`RoomIndex`]'s of each room
390    search_index: SearchIndex,
391
392    /// A monitor for background tasks spawned by the client.
393    pub(crate) task_monitor: TaskMonitor,
394
395    /// A sender to notify subscribers about duplicate key upload errors
396    /// triggered by requests to /keys/upload.
397    #[cfg(feature = "e2e-encryption")]
398    pub(crate) duplicate_key_upload_error_sender:
399        broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
400}
401
402impl ClientInner {
403    /// Create a new `ClientInner`.
404    ///
405    /// All the fields passed as parameters here are those that must be cloned
406    /// upon instantiation of a sub-client, e.g. a client specialized for
407    /// notifications.
408    #[allow(clippy::too_many_arguments)]
409    async fn new(
410        auth_ctx: Arc<AuthCtx>,
411        server: Option<Url>,
412        homeserver: Url,
413        sliding_sync_version: SlidingSyncVersion,
414        http_client: HttpClient,
415        base_client: BaseClient,
416        supported_versions: CachedValue<SupportedVersions>,
417        well_known: CachedValue<Option<WellKnownResponse>>,
418        respect_login_well_known: bool,
419        event_cache: OnceCell<EventCache>,
420        send_queue: Arc<SendQueueData>,
421        latest_events: OnceCell<LatestEvents>,
422        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
423        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
424        cross_process_lock_config: CrossProcessLockConfig,
425        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
426        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
427    ) -> Arc<Self> {
428        let caches = ClientCaches {
429            supported_versions: supported_versions.into(),
430            well_known: well_known.into(),
431            server_metadata: Mutex::new(TtlCache::new()),
432        };
433
434        let client = Self {
435            server,
436            homeserver: StdRwLock::new(homeserver),
437            auth_ctx,
438            sliding_sync_version: StdRwLock::new(sliding_sync_version),
439            http_client,
440            base_client,
441            caches,
442            locks: Default::default(),
443            cross_process_lock_config,
444            typing_notice_times: Default::default(),
445            event_handlers: Default::default(),
446            notification_handlers: Default::default(),
447            room_update_channels: Default::default(),
448            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
449            // ballast for all observers to catch up.
450            room_updates_sender: broadcast::Sender::new(32),
451            respect_login_well_known,
452            sync_beat: event_listener::Event::new(),
453            event_cache,
454            send_queue_data: send_queue,
455            latest_events,
456            #[cfg(feature = "e2e-encryption")]
457            e2ee: EncryptionData::new(encryption_settings),
458            #[cfg(feature = "e2e-encryption")]
459            verification_state: SharedObservable::new(VerificationState::Unknown),
460            #[cfg(feature = "e2e-encryption")]
461            enable_share_history_on_invite,
462            server_max_upload_size: Mutex::new(OnceCell::new()),
463            #[cfg(feature = "experimental-search")]
464            search_index: search_index_handler,
465            thread_subscription_catchup,
466            task_monitor: TaskMonitor::new(),
467            #[cfg(feature = "e2e-encryption")]
468            duplicate_key_upload_error_sender: broadcast::channel(1).0,
469        };
470
471        #[allow(clippy::let_and_return)]
472        let client = Arc::new(client);
473
474        #[cfg(feature = "e2e-encryption")]
475        client.e2ee.initialize_tasks(&client);
476
477        let _ = client
478            .event_cache
479            .get_or_init(|| async {
480                EventCache::new(&client, client.base_client.event_cache_store().clone())
481            })
482            .await;
483
484        let _ = client
485            .thread_subscription_catchup
486            .get_or_init(|| async {
487                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
488            })
489            .await;
490
491        client
492    }
493}
494
495#[cfg(not(tarpaulin_include))]
496impl Debug for Client {
497    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
498        write!(fmt, "Client")
499    }
500}
501
502impl Client {
503    /// Create a new [`Client`] that will use the given homeserver.
504    ///
505    /// # Arguments
506    ///
507    /// * `homeserver_url` - The homeserver that the client should connect to.
508    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
509        Self::builder().homeserver_url(homeserver_url).build().await
510    }
511
512    /// Returns a subscriber that publishes an event every time the ignore user
513    /// list changes.
514    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
515        self.inner.base_client.subscribe_to_ignore_user_list_changes()
516    }
517
518    /// Create a new [`ClientBuilder`].
519    pub fn builder() -> ClientBuilder {
520        ClientBuilder::new()
521    }
522
523    pub(crate) fn base_client(&self) -> &BaseClient {
524        &self.inner.base_client
525    }
526
527    /// The underlying HTTP client.
528    pub fn http_client(&self) -> &reqwest::Client {
529        &self.inner.http_client.inner
530    }
531
532    pub(crate) fn locks(&self) -> &ClientLocks {
533        &self.inner.locks
534    }
535
536    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
537        &self.inner.auth_ctx
538    }
539
540    /// The cross-process store lock configuration used by this [`Client`].
541    ///
542    /// The SDK provides cross-process store locks (see
543    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
544    /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
545    /// the value used for all cross-process store locks used by this
546    /// `Client`.
547    pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
548        &self.inner.cross_process_lock_config
549    }
550
551    /// Change the homeserver URL used by this client.
552    ///
553    /// # Arguments
554    ///
555    /// * `homeserver_url` - The new URL to use.
556    fn set_homeserver(&self, homeserver_url: Url) {
557        *self.inner.homeserver.write().unwrap() = homeserver_url;
558    }
559
560    /// Change to a different homeserver and re-resolve well-known.
561    #[cfg(feature = "e2e-encryption")]
562    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
563        &self,
564        homeserver_url: Url,
565    ) -> Result<()> {
566        self.set_homeserver(homeserver_url);
567        self.reset_well_known().await?;
568        if let Some(well_known) = self.load_or_fetch_well_known().await? {
569            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
570        }
571        Ok(())
572    }
573
574    /// Get the capabilities of the homeserver.
575    ///
576    /// This method should be used to check what features are supported by the
577    /// homeserver.
578    ///
579    /// # Examples
580    ///
581    /// ```no_run
582    /// # use matrix_sdk::Client;
583    /// # use url::Url;
584    /// # async {
585    /// # let homeserver = Url::parse("http://example.com")?;
586    /// let client = Client::new(homeserver).await?;
587    ///
588    /// let capabilities = client.get_capabilities().await?;
589    ///
590    /// if capabilities.change_password.enabled {
591    ///     // Change password
592    /// }
593    /// # anyhow::Ok(()) };
594    /// ```
595    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
596        let res = self.send(get_capabilities::v3::Request::new()).await?;
597        Ok(res.capabilities)
598    }
599
600    /// Get the server vendor information from the federation API.
601    ///
602    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
603    /// both the server's software name and version.
604    ///
605    /// # Examples
606    ///
607    /// ```no_run
608    /// # use matrix_sdk::Client;
609    /// # use url::Url;
610    /// # async {
611    /// # let homeserver = Url::parse("http://example.com")?;
612    /// let client = Client::new(homeserver).await?;
613    ///
614    /// let server_info = client.server_vendor_info(None).await?;
615    /// println!(
616    ///     "Server: {}, Version: {}",
617    ///     server_info.server_name, server_info.version
618    /// );
619    /// # anyhow::Ok(()) };
620    /// ```
621    #[cfg(feature = "federation-api")]
622    pub async fn server_vendor_info(
623        &self,
624        request_config: Option<RequestConfig>,
625    ) -> HttpResult<ServerVendorInfo> {
626        use ruma::api::federation::discovery::get_server_version;
627
628        let res = self
629            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
630            .await?;
631
632        // Extract server info, using defaults if fields are missing.
633        let server = res.server.unwrap_or_default();
634        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
635        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
636
637        Ok(ServerVendorInfo { server_name: server_name_str, version })
638    }
639
640    /// Get a copy of the default request config.
641    ///
642    /// The default request config is what's used when sending requests if no
643    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
644    /// function with such a parameter.
645    ///
646    /// If the default request config was not customized through
647    /// [`ClientBuilder`] when creating this `Client`, the returned value will
648    /// be equivalent to [`RequestConfig::default()`].
649    pub fn request_config(&self) -> RequestConfig {
650        self.inner.http_client.request_config
651    }
652
653    /// Check whether the client has been activated.
654    ///
655    /// A client is considered active when:
656    ///
657    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
658    ///    is logged in,
659    /// 2. Has loaded cached data from storage,
660    /// 3. If encryption is enabled, it also initialized or restored its
661    ///    `OlmMachine`.
662    pub fn is_active(&self) -> bool {
663        self.inner.base_client.is_active()
664    }
665
666    /// The server used by the client.
667    ///
668    /// See `Self::server` to learn more.
669    pub fn server(&self) -> Option<&Url> {
670        self.inner.server.as_ref()
671    }
672
673    /// The homeserver of the client.
674    pub fn homeserver(&self) -> Url {
675        self.inner.homeserver.read().unwrap().clone()
676    }
677
678    /// Get the sliding sync version.
679    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
680        self.inner.sliding_sync_version.read().unwrap().clone()
681    }
682
683    /// Override the sliding sync version.
684    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
685        let mut lock = self.inner.sliding_sync_version.write().unwrap();
686        *lock = version;
687    }
688
689    /// Get the Matrix user session meta information.
690    ///
691    /// If the client is currently logged in, this will return a
692    /// [`SessionMeta`] object which contains the user ID and device ID.
693    /// Otherwise it returns `None`.
694    pub fn session_meta(&self) -> Option<&SessionMeta> {
695        self.base_client().session_meta()
696    }
697
698    /// Returns a receiver that gets events for each room info update. To watch
699    /// for new events, use `receiver.resubscribe()`.
700    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
701        self.base_client().room_info_notable_update_receiver()
702    }
703
704    /// Performs a search for users.
705    /// The search is performed case-insensitively on user IDs and display names
706    ///
707    /// # Arguments
708    ///
709    /// * `search_term` - The search term for the search
710    /// * `limit` - The maximum number of results to return. Defaults to 10.
711    ///
712    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
713    pub async fn search_users(
714        &self,
715        search_term: &str,
716        limit: u64,
717    ) -> HttpResult<search_users::v3::Response> {
718        let mut request = search_users::v3::Request::new(search_term.to_owned());
719
720        if let Some(limit) = UInt::new(limit) {
721            request.limit = limit;
722        }
723
724        self.send(request).await
725    }
726
727    /// Get the user id of the current owner of the client.
728    pub fn user_id(&self) -> Option<&UserId> {
729        self.session_meta().map(|s| s.user_id.as_ref())
730    }
731
732    /// Get the device ID that identifies the current session.
733    pub fn device_id(&self) -> Option<&DeviceId> {
734        self.session_meta().map(|s| s.device_id.as_ref())
735    }
736
737    /// Get the current access token for this session.
738    ///
739    /// Will be `None` if the client has not been logged in.
740    pub fn access_token(&self) -> Option<String> {
741        self.auth_ctx().access_token()
742    }
743
744    /// Get the current tokens for this session.
745    ///
746    /// To be notified of changes in the session tokens, use
747    /// [`Client::subscribe_to_session_changes()`] or
748    /// [`Client::set_session_callbacks()`].
749    ///
750    /// Returns `None` if the client has not been logged in.
751    pub fn session_tokens(&self) -> Option<SessionTokens> {
752        self.auth_ctx().session_tokens()
753    }
754
755    /// Access the authentication API used to log in this client.
756    ///
757    /// Will be `None` if the client has not been logged in.
758    pub fn auth_api(&self) -> Option<AuthApi> {
759        match self.auth_ctx().auth_data.get()? {
760            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
761            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
762        }
763    }
764
765    /// Get the whole session info of this client.
766    ///
767    /// Will be `None` if the client has not been logged in.
768    ///
769    /// Can be used with [`Client::restore_session`] to restore a previously
770    /// logged-in session.
771    pub fn session(&self) -> Option<AuthSession> {
772        match self.auth_api()? {
773            AuthApi::Matrix(api) => api.session().map(Into::into),
774            AuthApi::OAuth(api) => api.full_session().map(Into::into),
775        }
776    }
777
778    /// Get a reference to the state store.
779    pub fn state_store(&self) -> &DynStateStore {
780        self.base_client().state_store()
781    }
782
783    /// Get a reference to the event cache store.
784    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
785        self.base_client().event_cache_store()
786    }
787
788    /// Get a reference to the media store.
789    pub fn media_store(&self) -> &MediaStoreLock {
790        self.base_client().media_store()
791    }
792
793    /// Access the native Matrix authentication API with this client.
794    pub fn matrix_auth(&self) -> MatrixAuth {
795        MatrixAuth::new(self.clone())
796    }
797
798    /// Get the account of the current owner of the client.
799    pub fn account(&self) -> Account {
800        Account::new(self.clone())
801    }
802
803    /// Get the encryption manager of the client.
804    #[cfg(feature = "e2e-encryption")]
805    pub fn encryption(&self) -> Encryption {
806        Encryption::new(self.clone())
807    }
808
809    /// Get the media manager of the client.
810    pub fn media(&self) -> Media {
811        Media::new(self.clone())
812    }
813
814    /// Get the pusher manager of the client.
815    pub fn pusher(&self) -> Pusher {
816        Pusher::new(self.clone())
817    }
818
819    /// Access the OAuth 2.0 API of the client.
820    pub fn oauth(&self) -> OAuth {
821        OAuth::new(self.clone())
822    }
823
824    /// Register a handler for a specific event type.
825    ///
826    /// The handler is a function or closure with one or more arguments. The
827    /// first argument is the event itself. All additional arguments are
828    /// "context" arguments: They have to implement [`EventHandlerContext`].
829    /// This trait is named that way because most of the types implementing it
830    /// give additional context about an event: The room it was in, its raw form
831    /// and other similar things. As two exceptions to this,
832    /// [`Client`] and [`EventHandlerHandle`] also implement the
833    /// `EventHandlerContext` trait so you don't have to clone your client
834    /// into the event handler manually and a handler can decide to remove
835    /// itself.
836    ///
837    /// Some context arguments are not universally applicable. A context
838    /// argument that isn't available for the given event type will result in
839    /// the event handler being skipped and an error being logged. The following
840    /// context argument types are only available for a subset of event types:
841    ///
842    /// * [`Room`] is only available for room-specific events, i.e. not for
843    ///   events like global account data events or presence events.
844    ///
845    /// You can provide custom context via
846    /// [`add_event_handler_context`](Client::add_event_handler_context) and
847    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
848    /// into the event handler.
849    ///
850    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
851    ///
852    /// # Examples
853    ///
854    /// ```no_run
855    /// use matrix_sdk::{
856    ///     deserialized_responses::EncryptionInfo,
857    ///     event_handler::Ctx,
858    ///     ruma::{
859    ///         events::{
860    ///             macros::EventContent,
861    ///             push_rules::PushRulesEvent,
862    ///             room::{
863    ///                 message::SyncRoomMessageEvent,
864    ///                 topic::SyncRoomTopicEvent,
865    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
866    ///             },
867    ///         },
868    ///         push::Action,
869    ///         Int, MilliSecondsSinceUnixEpoch,
870    ///     },
871    ///     Client, Room,
872    /// };
873    /// use serde::{Deserialize, Serialize};
874    ///
875    /// # async fn example(client: Client) {
876    /// client.add_event_handler(
877    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
878    ///         // Common usage: Room event plus room and client.
879    ///     },
880    /// );
881    /// client.add_event_handler(
882    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
883    ///         async move {
884    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
885    ///             // unencrypted events and events that were decrypted by the SDK.
886    ///         }
887    ///     },
888    /// );
889    /// client.add_event_handler(
890    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
891    ///         async move {
892    ///             // A `Vec<Action>` parameter allows you to know which push actions
893    ///             // are applicable for an event. For example, an event with
894    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
895    ///             // in the timeline.
896    ///         }
897    ///     },
898    /// );
899    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
900    ///     // You can omit any or all arguments after the first.
901    /// });
902    ///
903    /// // Registering a temporary event handler:
904    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
905    ///     /* Event handler */
906    /// });
907    /// client.remove_event_handler(handle);
908    ///
909    /// // Registering custom event handler context:
910    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
911    /// struct MyContext {
912    ///     number: usize,
913    /// }
914    /// client.add_event_handler_context(MyContext { number: 5 });
915    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
916    ///     // Use the context
917    /// });
918    ///
919    /// // This will handle membership events in joined rooms. Invites are special, see below.
920    /// client.add_event_handler(
921    ///     |ev: SyncRoomMemberEvent| async move {},
922    /// );
923    ///
924    /// // To handle state events in invited rooms (including invite membership events),
925    /// // `StrippedRoomMemberEvent` should be used.
926    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
927    /// client.add_event_handler(
928    ///     |ev: StrippedRoomMemberEvent| async move {},
929    /// );
930    ///
931    /// // Custom events work exactly the same way, you just need to declare
932    /// // the content struct and use the EventContent derive macro on it.
933    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
934    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
935    /// struct TokenEventContent {
936    ///     token: String,
937    ///     #[serde(rename = "exp")]
938    ///     expires_at: MilliSecondsSinceUnixEpoch,
939    /// }
940    ///
941    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
942    ///     todo!("Display the token");
943    /// });
944    ///
945    /// // Event handler closures can also capture local variables.
946    /// // Make sure they are cheap to clone though, because they will be cloned
947    /// // every time the closure is called.
948    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
949    ///
950    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
951    ///     println!("Calling the handler with identifier {data}");
952    /// });
953    /// # }
954    /// ```
955    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
956    where
957        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
958        H: EventHandler<Ev, Ctx>,
959    {
960        self.add_event_handler_impl(handler, None)
961    }
962
963    /// Register a handler for a specific room, and event type.
964    ///
965    /// This method works the same way as
966    /// [`add_event_handler`][Self::add_event_handler], except that the handler
967    /// will only be called for events in the room with the specified ID. See
968    /// that method for more details on event handler functions.
969    ///
970    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
971    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
972    /// your use case.
973    pub fn add_room_event_handler<Ev, Ctx, H>(
974        &self,
975        room_id: &RoomId,
976        handler: H,
977    ) -> EventHandlerHandle
978    where
979        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
980        H: EventHandler<Ev, Ctx>,
981    {
982        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
983    }
984
985    /// Observe a specific event type.
986    ///
987    /// `Ev` represents the kind of event that will be observed. `Ctx`
988    /// represents the context that will come with the event. It relies on the
989    /// same mechanism as [`Client::add_event_handler`]. The main difference is
990    /// that it returns an [`ObservableEventHandler`] and doesn't require a
991    /// user-defined closure. It is possible to subscribe to the
992    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
993    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
994    /// Ctx)`.
995    ///
996    /// Be careful that only the most recent value can be observed. Subscribers
997    /// are notified when a new value is sent, but there is no guarantee
998    /// that they will see all values.
999    ///
1000    /// # Example
1001    ///
1002    /// Let's see a classical usage:
1003    ///
1004    /// ```
1005    /// use futures_util::StreamExt as _;
1006    /// use matrix_sdk::{
1007    ///     Client, Room,
1008    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
1009    /// };
1010    ///
1011    /// # async fn example(client: Client) -> Option<()> {
1012    /// let observer =
1013    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1014    ///
1015    /// let mut subscriber = observer.subscribe();
1016    ///
1017    /// let (event, (room, push_actions)) = subscriber.next().await?;
1018    /// # Some(())
1019    /// # }
1020    /// ```
1021    ///
1022    /// Now let's see how to get several contexts that can be useful for you:
1023    ///
1024    /// ```
1025    /// use matrix_sdk::{
1026    ///     Client, Room,
1027    ///     deserialized_responses::EncryptionInfo,
1028    ///     ruma::{
1029    ///         events::room::{
1030    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1031    ///         },
1032    ///         push::Action,
1033    ///     },
1034    /// };
1035    ///
1036    /// # async fn example(client: Client) {
1037    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1038    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1039    ///
1040    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1041    /// // to distinguish between unencrypted events and events that were decrypted
1042    /// // by the SDK.
1043    /// let _ = client
1044    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1045    ///     );
1046    ///
1047    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1048    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1049    /// // should be highlighted in the timeline.
1050    /// let _ =
1051    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1052    ///
1053    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1054    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1055    /// # }
1056    /// ```
1057    ///
1058    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1059    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1060    where
1061        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1062        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1063    {
1064        self.observe_room_events_impl(None)
1065    }
1066
1067    /// Observe a specific room, and event type.
1068    ///
1069    /// This method works the same way as [`Client::observe_events`], except
1070    /// that the observability will only be applied for events in the room with
1071    /// the specified ID. See that method for more details.
1072    ///
1073    /// Be careful that only the most recent value can be observed. Subscribers
1074    /// are notified when a new value is sent, but there is no guarantee
1075    /// that they will see all values.
1076    pub fn observe_room_events<Ev, Ctx>(
1077        &self,
1078        room_id: &RoomId,
1079    ) -> ObservableEventHandler<(Ev, Ctx)>
1080    where
1081        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1082        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1083    {
1084        self.observe_room_events_impl(Some(room_id.to_owned()))
1085    }
1086
1087    /// Shared implementation for `Client::observe_events` and
1088    /// `Client::observe_room_events`.
1089    fn observe_room_events_impl<Ev, Ctx>(
1090        &self,
1091        room_id: Option<OwnedRoomId>,
1092    ) -> ObservableEventHandler<(Ev, Ctx)>
1093    where
1094        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1095        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1096    {
1097        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1098        // new value.
1099        let shared_observable = SharedObservable::new(None);
1100
1101        ObservableEventHandler::new(
1102            shared_observable.clone(),
1103            self.event_handler_drop_guard(self.add_event_handler_impl(
1104                move |event: Ev, context: Ctx| {
1105                    shared_observable.set(Some((event, context)));
1106
1107                    ready(())
1108                },
1109                room_id,
1110            )),
1111        )
1112    }
1113
1114    /// Remove the event handler associated with the handle.
1115    ///
1116    /// Note that you **must not** call `remove_event_handler` from the
1117    /// non-async part of an event handler, that is:
1118    ///
1119    /// ```ignore
1120    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1121    ///     // ⚠ this will cause a deadlock ⚠
1122    ///     client.remove_event_handler(handle);
1123    ///
1124    ///     async move {
1125    ///         // removing the event handler here is fine
1126    ///         client.remove_event_handler(handle);
1127    ///     }
1128    /// })
1129    /// ```
1130    ///
1131    /// Note also that handlers that remove themselves will still execute with
1132    /// events received in the same sync cycle.
1133    ///
1134    /// # Arguments
1135    ///
1136    /// `handle` - The [`EventHandlerHandle`] that is returned when
1137    /// registering the event handler with [`Client::add_event_handler`].
1138    ///
1139    /// # Examples
1140    ///
1141    /// ```no_run
1142    /// # use url::Url;
1143    /// # use tokio::sync::mpsc;
1144    /// #
1145    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1146    /// #
1147    /// use matrix_sdk::{
1148    ///     Client, event_handler::EventHandlerHandle,
1149    ///     ruma::events::room::member::SyncRoomMemberEvent,
1150    /// };
1151    /// #
1152    /// # futures_executor::block_on(async {
1153    /// # let client = matrix_sdk::Client::builder()
1154    /// #     .homeserver_url(homeserver)
1155    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1156    /// #     .build()
1157    /// #     .await
1158    /// #     .unwrap();
1159    ///
1160    /// client.add_event_handler(
1161    ///     |ev: SyncRoomMemberEvent,
1162    ///      client: Client,
1163    ///      handle: EventHandlerHandle| async move {
1164    ///         // Common usage: Check arriving Event is the expected one
1165    ///         println!("Expected RoomMemberEvent received!");
1166    ///         client.remove_event_handler(handle);
1167    ///     },
1168    /// );
1169    /// # });
1170    /// ```
1171    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1172        self.inner.event_handlers.remove(handle);
1173    }
1174
1175    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1176    /// the given handle.
1177    ///
1178    /// When the returned value is dropped, the event handler will be removed.
1179    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1180        EventHandlerDropGuard::new(handle, self.clone())
1181    }
1182
1183    /// Add an arbitrary value for use as event handler context.
1184    ///
1185    /// The value can be obtained in an event handler by adding an argument of
1186    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1187    ///
1188    /// If a value of the same type has been added before, it will be
1189    /// overwritten.
1190    ///
1191    /// # Examples
1192    ///
1193    /// ```no_run
1194    /// use matrix_sdk::{
1195    ///     Room, event_handler::Ctx,
1196    ///     ruma::events::room::message::SyncRoomMessageEvent,
1197    /// };
1198    /// # #[derive(Clone)]
1199    /// # struct SomeType;
1200    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1201    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1202    /// # futures_executor::block_on(async {
1203    /// # let client = matrix_sdk::Client::builder()
1204    /// #     .homeserver_url(homeserver)
1205    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1206    /// #     .build()
1207    /// #     .await
1208    /// #     .unwrap();
1209    ///
1210    /// // Handle used to send messages to the UI part of the app
1211    /// let my_gui_handle: SomeType = obtain_gui_handle();
1212    ///
1213    /// client.add_event_handler_context(my_gui_handle.clone());
1214    /// client.add_event_handler(
1215    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1216    ///         async move {
1217    ///             // gui_handle.send(DisplayMessage { message: ev });
1218    ///         }
1219    ///     },
1220    /// );
1221    /// # });
1222    /// ```
1223    pub fn add_event_handler_context<T>(&self, ctx: T)
1224    where
1225        T: Clone + Send + Sync + 'static,
1226    {
1227        self.inner.event_handlers.add_context(ctx);
1228    }
1229
1230    /// Register a handler for a notification.
1231    ///
1232    /// Similar to [`Client::add_event_handler`], but only allows functions
1233    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1234    /// [`Client`] for now.
1235    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1236    where
1237        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1238        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1239    {
1240        self.inner.notification_handlers.write().await.push(Box::new(
1241            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1242        ));
1243
1244        self
1245    }
1246
1247    /// Subscribe to all updates for the room with the given ID.
1248    ///
1249    /// The returned receiver will receive a new message for each sync response
1250    /// that contains updates for that room.
1251    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1252        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1253            btree_map::Entry::Vacant(entry) => {
1254                let (tx, rx) = broadcast::channel(8);
1255                entry.insert(tx);
1256                rx
1257            }
1258            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1259        }
1260    }
1261
1262    /// Subscribe to all updates to all rooms, whenever any has been received in
1263    /// a sync response.
1264    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1265        self.inner.room_updates_sender.subscribe()
1266    }
1267
1268    pub(crate) async fn notification_handlers(
1269        &self,
1270    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1271        self.inner.notification_handlers.read().await
1272    }
1273
1274    /// Get all the rooms the client knows about.
1275    ///
1276    /// This will return the list of joined, invited, and left rooms.
1277    pub fn rooms(&self) -> Vec<Room> {
1278        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1279    }
1280
1281    /// Get all the rooms the client knows about, filtered by room state.
1282    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1283        self.base_client()
1284            .rooms_filtered(filter)
1285            .into_iter()
1286            .map(|room| Room::new(self.clone(), room))
1287            .collect()
1288    }
1289
1290    /// Get a stream of all the rooms, in addition to the existing rooms.
1291    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1292        let (rooms, stream) = self.base_client().rooms_stream();
1293
1294        let map_room = |room| Room::new(self.clone(), room);
1295
1296        (
1297            rooms.into_iter().map(map_room).collect(),
1298            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1299        )
1300    }
1301
1302    /// Returns the joined rooms this client knows about.
1303    pub fn joined_rooms(&self) -> Vec<Room> {
1304        self.rooms_filtered(RoomStateFilter::JOINED)
1305    }
1306
1307    /// Returns the invited rooms this client knows about.
1308    pub fn invited_rooms(&self) -> Vec<Room> {
1309        self.rooms_filtered(RoomStateFilter::INVITED)
1310    }
1311
1312    /// Returns the left rooms this client knows about.
1313    pub fn left_rooms(&self) -> Vec<Room> {
1314        self.rooms_filtered(RoomStateFilter::LEFT)
1315    }
1316
1317    /// Returns the joined space rooms this client knows about.
1318    pub fn joined_space_rooms(&self) -> Vec<Room> {
1319        self.base_client()
1320            .rooms_filtered(RoomStateFilter::JOINED)
1321            .into_iter()
1322            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1323            .collect()
1324    }
1325
1326    /// Get a room with the given room id.
1327    ///
1328    /// # Arguments
1329    ///
1330    /// `room_id` - The unique id of the room that should be fetched.
1331    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1332        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1333    }
1334
1335    /// Gets the preview of a room, whether the current user has joined it or
1336    /// not.
1337    pub async fn get_room_preview(
1338        &self,
1339        room_or_alias_id: &RoomOrAliasId,
1340        via: Vec<OwnedServerName>,
1341    ) -> Result<RoomPreview> {
1342        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1343            Ok(room_id) => room_id.to_owned(),
1344            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1345        };
1346
1347        if let Some(room) = self.get_room(&room_id) {
1348            // The cached data can only be trusted if the room state is joined or
1349            // banned: for invite and knock rooms, no updates will be received
1350            // for the rooms after the invite/knock action took place so we may
1351            // have very out to date data for important fields such as
1352            // `join_rule`. For left rooms, the homeserver should return the latest info.
1353            match room.state() {
1354                RoomState::Joined | RoomState::Banned => {
1355                    return Ok(RoomPreview::from_known_room(&room).await);
1356                }
1357                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1358            }
1359        }
1360
1361        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1362    }
1363
1364    /// Resolve a room alias to a room id and a list of servers which know
1365    /// about it.
1366    ///
1367    /// # Arguments
1368    ///
1369    /// `room_alias` - The room alias to be resolved.
1370    pub async fn resolve_room_alias(
1371        &self,
1372        room_alias: &RoomAliasId,
1373    ) -> HttpResult<get_alias::v3::Response> {
1374        let request = get_alias::v3::Request::new(room_alias.to_owned());
1375        self.send(request).await
1376    }
1377
1378    /// Checks if a room alias is not in use yet.
1379    ///
1380    /// Returns:
1381    /// - `Ok(true)` if the room alias is available.
1382    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1383    ///   status code).
1384    /// - An `Err` otherwise.
1385    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1386        match self.resolve_room_alias(alias).await {
1387            // The room alias was resolved, so it's already in use.
1388            Ok(_) => Ok(false),
1389            Err(error) => {
1390                match error.client_api_error_kind() {
1391                    // The room alias wasn't found, so it's available.
1392                    Some(ErrorKind::NotFound) => Ok(true),
1393                    _ => Err(error),
1394                }
1395            }
1396        }
1397    }
1398
1399    /// Adds a new room alias associated with a room to the room directory.
1400    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1401        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1402        self.send(request).await?;
1403        Ok(())
1404    }
1405
1406    /// Removes a room alias from the room directory.
1407    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1408        let request = delete_alias::v3::Request::new(alias.to_owned());
1409        self.send(request).await?;
1410        Ok(())
1411    }
1412
1413    /// Update the homeserver from the login response well-known if needed.
1414    ///
1415    /// # Arguments
1416    ///
1417    /// * `login_well_known` - The `well_known` field from a successful login
1418    ///   response.
1419    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1420        if self.inner.respect_login_well_known
1421            && let Some(well_known) = login_well_known
1422            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1423        {
1424            self.set_homeserver(homeserver);
1425        }
1426    }
1427
1428    /// Similar to [`Client::restore_session_with`], with
1429    /// [`RoomLoadSettings::default()`].
1430    ///
1431    /// # Panics
1432    ///
1433    /// Panics if a session was already restored or logged in.
1434    #[instrument(skip_all)]
1435    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1436        self.restore_session_with(session, RoomLoadSettings::default()).await
1437    }
1438
1439    /// Restore a session previously logged-in using one of the available
1440    /// authentication APIs. The number of rooms to restore is controlled by
1441    /// [`RoomLoadSettings`].
1442    ///
1443    /// See the documentation of the corresponding authentication API's
1444    /// `restore_session` method for more information.
1445    ///
1446    /// # Panics
1447    ///
1448    /// Panics if a session was already restored or logged in.
1449    #[instrument(skip_all)]
1450    pub async fn restore_session_with(
1451        &self,
1452        session: impl Into<AuthSession>,
1453        room_load_settings: RoomLoadSettings,
1454    ) -> Result<()> {
1455        let session = session.into();
1456        match session {
1457            AuthSession::Matrix(session) => {
1458                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1459            }
1460            AuthSession::OAuth(session) => {
1461                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1462            }
1463        }
1464    }
1465
1466    /// Refresh the access token using the authentication API used to log into
1467    /// this session.
1468    ///
1469    /// See the documentation of the authentication API's `refresh_access_token`
1470    /// method for more information.
1471    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1472        let Some(auth_api) = self.auth_api() else {
1473            return Err(RefreshTokenError::RefreshTokenRequired);
1474        };
1475
1476        match auth_api {
1477            AuthApi::Matrix(api) => {
1478                trace!("Token refresh: Using the homeserver.");
1479                Box::pin(api.refresh_access_token()).await?;
1480            }
1481            AuthApi::OAuth(api) => {
1482                trace!("Token refresh: Using OAuth 2.0.");
1483                Box::pin(api.refresh_access_token()).await?;
1484            }
1485        }
1486
1487        Ok(())
1488    }
1489
1490    /// Log out the current session using the proper authentication API.
1491    ///
1492    /// # Errors
1493    ///
1494    /// Returns an error if the session is not authenticated or if an error
1495    /// occurred while making the request to the server.
1496    pub async fn logout(&self) -> Result<(), Error> {
1497        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1498        match auth_api {
1499            AuthApi::Matrix(matrix_auth) => {
1500                matrix_auth.logout().await?;
1501                Ok(())
1502            }
1503            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1504        }
1505    }
1506
1507    /// Get or upload a sync filter.
1508    ///
1509    /// This method will either get a filter ID from the store or upload the
1510    /// filter definition to the homeserver and return the new filter ID.
1511    ///
1512    /// # Arguments
1513    ///
1514    /// * `filter_name` - The unique name of the filter, this name will be used
1515    /// locally to store and identify the filter ID returned by the server.
1516    ///
1517    /// * `definition` - The filter definition that should be uploaded to the
1518    /// server if no filter ID can be found in the store.
1519    ///
1520    /// # Examples
1521    ///
1522    /// ```no_run
1523    /// # use matrix_sdk::{
1524    /// #    Client, config::SyncSettings,
1525    /// #    ruma::api::client::{
1526    /// #        filter::{
1527    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1528    /// #        },
1529    /// #        sync::sync_events::v3::Filter,
1530    /// #    }
1531    /// # };
1532    /// # use url::Url;
1533    /// # async {
1534    /// # let homeserver = Url::parse("http://example.com").unwrap();
1535    /// # let client = Client::new(homeserver).await.unwrap();
1536    /// let mut filter = FilterDefinition::default();
1537    ///
1538    /// // Let's enable member lazy loading.
1539    /// filter.room.state.lazy_load_options =
1540    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1541    ///
1542    /// let filter_id = client
1543    ///     .get_or_upload_filter("sync", filter)
1544    ///     .await
1545    ///     .unwrap();
1546    ///
1547    /// let sync_settings = SyncSettings::new()
1548    ///     .filter(Filter::FilterId(filter_id));
1549    ///
1550    /// let response = client.sync_once(sync_settings).await.unwrap();
1551    /// # };
1552    #[instrument(skip(self, definition))]
1553    pub async fn get_or_upload_filter(
1554        &self,
1555        filter_name: &str,
1556        definition: FilterDefinition,
1557    ) -> Result<String> {
1558        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1559            debug!("Found filter locally");
1560            Ok(filter)
1561        } else {
1562            debug!("Didn't find filter locally");
1563            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1564            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1565            let response = self.send(request).await?;
1566
1567            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1568
1569            Ok(response.filter_id)
1570        }
1571    }
1572
1573    /// Prepare to join a room by ID, by getting the current details about it
1574    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1575        let room = self.get_room(room_id)?;
1576
1577        let inviter = match room.invite_details().await {
1578            Ok(details) => details.inviter,
1579            Err(Error::WrongRoomState(_)) => None,
1580            Err(e) => {
1581                warn!("Error fetching invite details for room: {e:?}");
1582                None
1583            }
1584        };
1585
1586        Some(PreJoinRoomInfo { inviter })
1587    }
1588
1589    /// Finish joining a room.
1590    ///
1591    /// If the room was an invite that should be marked as a DM, will include it
1592    /// in the DM event after creating the joined room.
1593    ///
1594    /// If encrypted history sharing is enabled, will check to see if we have a
1595    /// key bundle, and import it if so.
1596    ///
1597    /// # Arguments
1598    ///
1599    /// * `room_id` - The `RoomId` of the room that was joined.
1600    /// * `pre_join_room_info` - Information about the room before we joined.
1601    async fn finish_join_room(
1602        &self,
1603        room_id: &RoomId,
1604        pre_join_room_info: Option<PreJoinRoomInfo>,
1605    ) -> Result<Room> {
1606        info!(?room_id, ?pre_join_room_info, "Completing room join");
1607        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1608            room.state() == RoomState::Invited
1609                && room.is_direct().await.unwrap_or_else(|e| {
1610                    warn!(%room_id, "is_direct() failed: {e}");
1611                    false
1612                })
1613        } else {
1614            false
1615        };
1616
1617        let base_room = self
1618            .base_client()
1619            .room_joined(
1620                room_id,
1621                pre_join_room_info
1622                    .as_ref()
1623                    .and_then(|info| info.inviter.as_ref())
1624                    .map(|i| i.user_id().to_owned()),
1625            )
1626            .await?;
1627        let room = Room::new(self.clone(), base_room);
1628
1629        if mark_as_dm {
1630            room.set_is_direct(true).await?;
1631        }
1632
1633        // If we joined following an invite, check if we had previously received a key
1634        // bundle from the inviter, and import it if so.
1635        //
1636        // It's important that we only do this once `BaseClient::room_joined` has
1637        // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1638        // race.
1639        #[cfg(feature = "e2e-encryption")]
1640        if self.inner.enable_share_history_on_invite
1641            && let Some(inviter) =
1642                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1643        {
1644            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1645                .await?;
1646        }
1647
1648        // Suppress "unused variable" and "unused field" lints
1649        #[cfg(not(feature = "e2e-encryption"))]
1650        let _ = pre_join_room_info.map(|i| i.inviter);
1651
1652        Ok(room)
1653    }
1654
1655    /// Join a room by `RoomId`.
1656    ///
1657    /// Returns the `Room` in the joined state.
1658    ///
1659    /// # Arguments
1660    ///
1661    /// * `room_id` - The `RoomId` of the room to be joined.
1662    #[instrument(skip(self))]
1663    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1664        // See who invited us to this room, if anyone. Note we have to do this before
1665        // making the `/join` request, otherwise we could race against the sync.
1666        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1667
1668        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1669        let response = self.send(request).await?;
1670        self.finish_join_room(&response.room_id, pre_join_info).await
1671    }
1672
1673    /// Join a room by `RoomOrAliasId`.
1674    ///
1675    /// Returns the `Room` in the joined state.
1676    ///
1677    /// # Arguments
1678    ///
1679    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1680    ///   alias looks like `#name:example.com`.
1681    /// * `server_names` - The server names to be used for resolving the alias,
1682    ///   if needs be.
1683    #[instrument(skip(self))]
1684    pub async fn join_room_by_id_or_alias(
1685        &self,
1686        alias: &RoomOrAliasId,
1687        server_names: &[OwnedServerName],
1688    ) -> Result<Room> {
1689        let pre_join_info = {
1690            match alias.try_into() {
1691                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1692                Err(_) => {
1693                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1694                    // responding to an invitation to the room, and therefore don't need to handle
1695                    // things that happen as a result of invites.
1696                    None
1697                }
1698            }
1699        };
1700        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1701            via: server_names.to_owned(),
1702        });
1703        let response = self.send(request).await?;
1704        self.finish_join_room(&response.room_id, pre_join_info).await
1705    }
1706
1707    /// Search the homeserver's directory of public rooms.
1708    ///
1709    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1710    /// a `get_public_rooms::Response`.
1711    ///
1712    /// # Arguments
1713    ///
1714    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1715    ///
1716    /// * `since` - Pagination token from a previous request.
1717    ///
1718    /// * `server` - The name of the server, if `None` the requested server is
1719    ///   used.
1720    ///
1721    /// # Examples
1722    /// ```no_run
1723    /// use matrix_sdk::Client;
1724    /// # use url::Url;
1725    /// # let homeserver = Url::parse("http://example.com").unwrap();
1726    /// # let limit = Some(10);
1727    /// # let since = Some("since token");
1728    /// # let server = Some("servername.com".try_into().unwrap());
1729    /// # async {
1730    /// let mut client = Client::new(homeserver).await.unwrap();
1731    ///
1732    /// client.public_rooms(limit, since, server).await;
1733    /// # };
1734    /// ```
1735    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1736    pub async fn public_rooms(
1737        &self,
1738        limit: Option<u32>,
1739        since: Option<&str>,
1740        server: Option<&ServerName>,
1741    ) -> HttpResult<get_public_rooms::v3::Response> {
1742        let limit = limit.map(UInt::from);
1743
1744        let request = assign!(get_public_rooms::v3::Request::new(), {
1745            limit,
1746            since: since.map(ToOwned::to_owned),
1747            server: server.map(ToOwned::to_owned),
1748        });
1749        self.send(request).await
1750    }
1751
1752    /// Create a room with the given parameters.
1753    ///
1754    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1755    /// created room.
1756    ///
1757    /// If you want to create a direct message with one specific user, you can
1758    /// use [`create_dm`][Self::create_dm], which is more convenient than
1759    /// assembling the [`create_room::v3::Request`] yourself.
1760    ///
1761    /// If the `is_direct` field of the request is set to `true` and at least
1762    /// one user is invited, the room will be automatically added to the direct
1763    /// rooms in the account data.
1764    ///
1765    /// # Examples
1766    ///
1767    /// ```no_run
1768    /// use matrix_sdk::{
1769    ///     Client,
1770    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1771    /// };
1772    /// # use url::Url;
1773    /// #
1774    /// # async {
1775    /// # let homeserver = Url::parse("http://example.com").unwrap();
1776    /// let request = CreateRoomRequest::new();
1777    /// let client = Client::new(homeserver).await.unwrap();
1778    /// assert!(client.create_room(request).await.is_ok());
1779    /// # };
1780    /// ```
1781    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1782        let invite = request.invite.clone();
1783        let is_direct_room = request.is_direct;
1784        let response = self.send(request).await?;
1785
1786        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1787
1788        let joined_room = Room::new(self.clone(), base_room);
1789
1790        if is_direct_room
1791            && !invite.is_empty()
1792            && let Err(error) =
1793                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1794        {
1795            // FIXME: Retry in the background
1796            error!("Failed to mark room as DM: {error}");
1797        }
1798
1799        Ok(joined_room)
1800    }
1801
1802    /// Create a DM room.
1803    ///
1804    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1805    /// given user being invited, the room marked `is_direct` and both the
1806    /// creator and invitee getting the default maximum power level.
1807    ///
1808    /// If the `e2e-encryption` feature is enabled, the room will also be
1809    /// encrypted.
1810    ///
1811    /// # Arguments
1812    ///
1813    /// * `user_id` - The ID of the user to create a DM for.
1814    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1815        #[cfg(feature = "e2e-encryption")]
1816        let initial_state = vec![
1817            InitialStateEvent::with_empty_state_key(
1818                RoomEncryptionEventContent::with_recommended_defaults(),
1819            )
1820            .to_raw_any(),
1821        ];
1822
1823        #[cfg(not(feature = "e2e-encryption"))]
1824        let initial_state = vec![];
1825
1826        let request = assign!(create_room::v3::Request::new(), {
1827            invite: vec![user_id.to_owned()],
1828            is_direct: true,
1829            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1830            initial_state,
1831        });
1832
1833        self.create_room(request).await
1834    }
1835
1836    /// Get the existing DM room with the given user, if any.
1837    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1838        let rooms = self.joined_rooms();
1839
1840        // Find the room we share with the `user_id` and only with `user_id`
1841        let room = rooms.into_iter().find(|r| {
1842            let targets = r.direct_targets();
1843            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1844        });
1845
1846        trace!(?user_id, ?room, "Found DM room with user");
1847        room
1848    }
1849
1850    /// Search the homeserver's directory for public rooms with a filter.
1851    ///
1852    /// # Arguments
1853    ///
1854    /// * `room_search` - The easiest way to create this request is using the
1855    ///   `get_public_rooms_filtered::Request` itself.
1856    ///
1857    /// # Examples
1858    ///
1859    /// ```no_run
1860    /// # use url::Url;
1861    /// # use matrix_sdk::Client;
1862    /// # async {
1863    /// # let homeserver = Url::parse("http://example.com")?;
1864    /// use matrix_sdk::ruma::{
1865    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1866    /// };
1867    /// # let mut client = Client::new(homeserver).await?;
1868    ///
1869    /// let mut filter = Filter::new();
1870    /// filter.generic_search_term = Some("rust".to_owned());
1871    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1872    /// request.filter = filter;
1873    ///
1874    /// let response = client.public_rooms_filtered(request).await?;
1875    ///
1876    /// for room in response.chunk {
1877    ///     println!("Found room {room:?}");
1878    /// }
1879    /// # anyhow::Ok(()) };
1880    /// ```
1881    pub async fn public_rooms_filtered(
1882        &self,
1883        request: get_public_rooms_filtered::v3::Request,
1884    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1885        self.send(request).await
1886    }
1887
1888    /// Send an arbitrary request to the server, without updating client state.
1889    ///
1890    /// **Warning:** Because this method *does not* update the client state, it
1891    /// is important to make sure that you account for this yourself, and
1892    /// use wrapper methods where available.  This method should *only* be
1893    /// used if a wrapper method for the endpoint you'd like to use is not
1894    /// available.
1895    ///
1896    /// # Arguments
1897    ///
1898    /// * `request` - A filled out and valid request for the endpoint to be hit
1899    ///
1900    /// * `timeout` - An optional request timeout setting, this overrides the
1901    ///   default request setting if one was set.
1902    ///
1903    /// # Examples
1904    ///
1905    /// ```no_run
1906    /// # use matrix_sdk::{Client, config::SyncSettings};
1907    /// # use url::Url;
1908    /// # async {
1909    /// # let homeserver = Url::parse("http://localhost:8080")?;
1910    /// # let mut client = Client::new(homeserver).await?;
1911    /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1912    ///
1913    /// // First construct the request you want to make
1914    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1915    /// // for all available Endpoints
1916    /// let user_id = owned_user_id!("@example:localhost");
1917    /// let request = profile::get_profile::v3::Request::new(user_id);
1918    ///
1919    /// // Start the request using Client::send()
1920    /// let response = client.send(request).await?;
1921    ///
1922    /// // Check the corresponding Response struct to find out what types are
1923    /// // returned
1924    /// # anyhow::Ok(()) };
1925    /// ```
1926    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1927    where
1928        Request: OutgoingRequest + Clone + Debug,
1929        for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1930        Request::PathBuilder: SupportedPathBuilder,
1931        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1932        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1933    {
1934        SendRequest {
1935            client: self.clone(),
1936            request,
1937            config: None,
1938            send_progress: Default::default(),
1939        }
1940    }
1941
1942    pub(crate) async fn send_inner<Request>(
1943        &self,
1944        request: Request,
1945        config: Option<RequestConfig>,
1946        send_progress: SharedObservable<TransmissionProgress>,
1947    ) -> HttpResult<Request::IncomingResponse>
1948    where
1949        Request: OutgoingRequest + Debug,
1950        for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1951        Request::PathBuilder: SupportedPathBuilder,
1952        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1953        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1954    {
1955        let homeserver = self.homeserver().to_string();
1956        let access_token = self.access_token();
1957        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1958
1959        let path_builder_input =
1960            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1961
1962        let result = self
1963            .inner
1964            .http_client
1965            .send(
1966                request,
1967                config,
1968                homeserver,
1969                access_token.as_deref(),
1970                path_builder_input,
1971                send_progress,
1972            )
1973            .await;
1974
1975        if let Err(Some(ErrorKind::UnknownToken { .. })) =
1976            result.as_ref().map_err(HttpError::client_api_error_kind)
1977            && let Some(access_token) = &access_token
1978        {
1979            // Mark the access token as expired.
1980            self.auth_ctx().set_access_token_expired(access_token);
1981        }
1982
1983        result
1984    }
1985
1986    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1987        _ = self
1988            .inner
1989            .auth_ctx
1990            .session_change_sender
1991            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1992    }
1993
1994    /// Fetches server versions from network; no caching.
1995    pub async fn fetch_server_versions(
1996        &self,
1997        request_config: Option<RequestConfig>,
1998    ) -> HttpResult<get_supported_versions::Response> {
1999        // Since this was called by the user, try to refresh the access token if
2000        // necessary.
2001        self.fetch_server_versions_inner(false, request_config).await
2002    }
2003
2004    /// Fetches server versions from network; no caching.
2005    ///
2006    /// If the access token is expired and `failsafe` is `false`, this will
2007    /// attempt to refresh the access token, otherwise this will try to make an
2008    /// unauthenticated request instead.
2009    pub(crate) async fn fetch_server_versions_inner(
2010        &self,
2011        failsafe: bool,
2012        request_config: Option<RequestConfig>,
2013    ) -> HttpResult<get_supported_versions::Response> {
2014        if !failsafe {
2015            // `Client::send()` handles refreshing access tokens.
2016            return self
2017                .send(get_supported_versions::Request::new())
2018                .with_request_config(request_config)
2019                .await;
2020        }
2021
2022        let homeserver = self.homeserver().to_string();
2023
2024        // If we have a fresh access token, try with it first.
2025        if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2026            && self.auth_ctx().has_valid_access_token()
2027            && let Some(access_token) = self.access_token()
2028        {
2029            let result = self
2030                .inner
2031                .http_client
2032                .send(
2033                    get_supported_versions::Request::new(),
2034                    request_config,
2035                    homeserver.clone(),
2036                    Some(&access_token),
2037                    (),
2038                    Default::default(),
2039                )
2040                .await;
2041
2042            if let Err(Some(ErrorKind::UnknownToken { .. })) =
2043                result.as_ref().map_err(HttpError::client_api_error_kind)
2044            {
2045                // If the access token is actually expired, mark it as expired and fallback to
2046                // the unauthenticated request below.
2047                self.auth_ctx().set_access_token_expired(&access_token);
2048            } else {
2049                // If the request succeeded or it's an other error, just stop now.
2050                return result;
2051            }
2052        }
2053
2054        // Try without authentication.
2055        self.inner
2056            .http_client
2057            .send(
2058                get_supported_versions::Request::new(),
2059                request_config,
2060                homeserver.clone(),
2061                None,
2062                (),
2063                Default::default(),
2064            )
2065            .await
2066    }
2067
2068    /// Fetches client well_known from network; no caching.
2069    ///
2070    /// 1. If the [`Client::server`] value is available, we use it to fetch the
2071    ///    well-known contents.
2072    /// 2. If it's not, we try extracting the server name from the
2073    ///    [`Client::user_id`] and building the server URL from it.
2074    /// 3. If we couldn't get the well-known contents with either the explicit
2075    ///    server name or the implicit extracted one, we try the homeserver URL
2076    ///    as a last resort.
2077    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2078        let homeserver = self.homeserver();
2079        let scheme = homeserver.scheme();
2080
2081        // Use the server name, either an explicit one or an implicit one taken from
2082        // the user id: sometimes we'll have only the homeserver url available and no
2083        // server name, but the server name can be extracted from the current user id.
2084        let server_url = self
2085            .server()
2086            .map(|server| server.to_string())
2087            // If the server name wasn't available, extract it from the user id and build a URL:
2088            // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2089            // will be the same for the public server url, lacking a better candidate.
2090            .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2091
2092        // If the server name is available, first try using it
2093        let response = if let Some(server_url) = server_url {
2094            // First try using the server name
2095            self.fetch_client_well_known_with_url(server_url).await
2096        } else {
2097            None
2098        };
2099
2100        // If we didn't get a well-known value yet, try with the homeserver url instead:
2101        if response.is_none() {
2102            // Sometimes people configure their well-known directly on the homeserver so use
2103            // this as a fallback when the server name is unknown.
2104            warn!(
2105                "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2106            );
2107            self.fetch_client_well_known_with_url(homeserver.to_string()).await
2108        } else {
2109            response
2110        }
2111    }
2112
2113    async fn fetch_client_well_known_with_url(
2114        &self,
2115        url: String,
2116    ) -> Option<discover_homeserver::Response> {
2117        let well_known = self
2118            .inner
2119            .http_client
2120            .send(
2121                discover_homeserver::Request::new(),
2122                Some(RequestConfig::short_retry()),
2123                url,
2124                None,
2125                (),
2126                Default::default(),
2127            )
2128            .await;
2129
2130        match well_known {
2131            Ok(well_known) => Some(well_known),
2132            Err(http_error) => {
2133                // It is perfectly valid to not have a well-known file.
2134                // Maybe we should check for a specific error code to be sure?
2135                warn!("Failed to fetch client well-known: {http_error}");
2136                None
2137            }
2138        }
2139    }
2140
2141    /// Load supported versions from storage, or fetch them from network and
2142    /// cache them.
2143    ///
2144    /// If `failsafe` is true, this will try to minimize side effects to avoid
2145    /// possible deadlocks.
2146    async fn load_or_fetch_supported_versions(
2147        &self,
2148        failsafe: bool,
2149    ) -> HttpResult<SupportedVersionsResponse> {
2150        match self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await {
2151            Ok(Some(stored)) => {
2152                if let Some(supported_versions) =
2153                    stored.into_supported_versions().and_then(|value| value.into_data())
2154                {
2155                    return Ok(supported_versions);
2156                }
2157            }
2158            Ok(None) => {
2159                // fallthrough: cache is empty
2160            }
2161            Err(err) => {
2162                warn!("error when loading cached supported versions: {err}");
2163                // fallthrough to network.
2164            }
2165        }
2166
2167        let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2168        let supported_versions = SupportedVersionsResponse {
2169            versions: server_versions.versions,
2170            unstable_features: server_versions.unstable_features,
2171        };
2172
2173        // Only attempt to cache the result in storage if the request was authenticated.
2174        if self.auth_ctx().has_valid_access_token()
2175            && let Err(err) = self
2176                .state_store()
2177                .set_kv_data(
2178                    StateStoreDataKey::SupportedVersions,
2179                    StateStoreDataValue::SupportedVersions(TtlStoreValue::new(
2180                        supported_versions.clone(),
2181                    )),
2182                )
2183                .await
2184        {
2185            warn!("error when caching supported versions: {err}");
2186        }
2187
2188        Ok(supported_versions)
2189    }
2190
2191    pub(crate) async fn get_cached_supported_versions(&self) -> Option<SupportedVersions> {
2192        let supported_versions = &self.inner.caches.supported_versions;
2193
2194        if let CachedValue::Cached(val) = &*supported_versions.read().await {
2195            Some(val.clone())
2196        } else {
2197            None
2198        }
2199    }
2200
2201    /// Get the Matrix versions and features supported by the homeserver by
2202    /// fetching them from the server or the cache.
2203    ///
2204    /// This is equivalent to calling both [`Client::server_versions()`] and
2205    /// [`Client::unstable_features()`]. To always fetch the result from the
2206    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2207    /// and then `.as_supported_versions()` on the response.
2208    ///
2209    /// # Examples
2210    ///
2211    /// ```no_run
2212    /// use ruma::api::{FeatureFlag, MatrixVersion};
2213    /// # use matrix_sdk::{Client, config::SyncSettings};
2214    /// # use url::Url;
2215    /// # async {
2216    /// # let homeserver = Url::parse("http://localhost:8080")?;
2217    /// # let mut client = Client::new(homeserver).await?;
2218    ///
2219    /// let supported = client.supported_versions().await?;
2220    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2221    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2222    ///
2223    /// let msc_x_feature = FeatureFlag::from("msc_x");
2224    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2225    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2226    /// # anyhow::Ok(()) };
2227    /// ```
2228    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2229        self.supported_versions_inner(false).await
2230    }
2231
2232    /// Get the Matrix versions and features supported by the homeserver by
2233    /// fetching them from the server or the cache.
2234    ///
2235    /// If `failsafe` is true, this will try to minimize side effects to avoid
2236    /// possible deadlocks.
2237    pub(crate) async fn supported_versions_inner(
2238        &self,
2239        failsafe: bool,
2240    ) -> HttpResult<SupportedVersions> {
2241        let cached_supported_versions = &self.inner.caches.supported_versions;
2242        if let CachedValue::Cached(val) = &*cached_supported_versions.read().await {
2243            return Ok(val.clone());
2244        }
2245
2246        let mut guarded_supported_versions = cached_supported_versions.write().await;
2247        if let CachedValue::Cached(val) = &*guarded_supported_versions {
2248            return Ok(val.clone());
2249        }
2250
2251        let supported = self.load_or_fetch_supported_versions(failsafe).await?;
2252
2253        // Fill both unstable features and server versions at once.
2254        let mut supported_versions = supported.supported_versions();
2255        if supported_versions.versions.is_empty() {
2256            supported_versions.versions = [MatrixVersion::V1_0].into();
2257        }
2258
2259        // Only cache the result if the request was authenticated.
2260        if self.auth_ctx().has_valid_access_token() {
2261            *guarded_supported_versions = CachedValue::Cached(supported_versions.clone());
2262        }
2263
2264        Ok(supported_versions)
2265    }
2266
2267    /// Get the Matrix versions and features supported by the homeserver by
2268    /// fetching them from the cache.
2269    ///
2270    /// For a version of this function that fetches the supported versions and
2271    /// features from the homeserver if the [`SupportedVersions`] aren't
2272    /// found in the cache, take a look at the [`Client::server_versions()`]
2273    /// method.
2274    ///
2275    /// # Examples
2276    ///
2277    /// ```no_run
2278    /// use ruma::api::{FeatureFlag, MatrixVersion};
2279    /// # use matrix_sdk::{Client, config::SyncSettings};
2280    /// # use url::Url;
2281    /// # async {
2282    /// # let homeserver = Url::parse("http://localhost:8080")?;
2283    /// # let mut client = Client::new(homeserver).await?;
2284    ///
2285    /// let supported =
2286    ///     if let Some(supported) = client.supported_versions_cached().await? {
2287    ///         supported
2288    ///     } else {
2289    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2290    ///     };
2291    ///
2292    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2293    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2294    ///
2295    /// let msc_x_feature = FeatureFlag::from("msc_x");
2296    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2297    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2298    /// # anyhow::Ok(()) };
2299    /// ```
2300    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2301        if let Some(cached) = self.get_cached_supported_versions().await {
2302            Ok(Some(cached))
2303        } else if let Some(stored) =
2304            self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await?
2305        {
2306            if let Some(supported) =
2307                stored.into_supported_versions().and_then(|value| value.into_data())
2308            {
2309                Ok(Some(supported.supported_versions()))
2310            } else {
2311                Ok(None)
2312            }
2313        } else {
2314            Ok(None)
2315        }
2316    }
2317
2318    /// Get the Matrix versions supported by the homeserver by fetching them
2319    /// from the server or the cache.
2320    ///
2321    /// # Examples
2322    ///
2323    /// ```no_run
2324    /// use ruma::api::MatrixVersion;
2325    /// # use matrix_sdk::{Client, config::SyncSettings};
2326    /// # use url::Url;
2327    /// # async {
2328    /// # let homeserver = Url::parse("http://localhost:8080")?;
2329    /// # let mut client = Client::new(homeserver).await?;
2330    ///
2331    /// let server_versions = client.server_versions().await?;
2332    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2333    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2334    /// # anyhow::Ok(()) };
2335    /// ```
2336    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2337        Ok(self.supported_versions().await?.versions)
2338    }
2339
2340    /// Get the unstable features supported by the homeserver by fetching them
2341    /// from the server or the cache.
2342    ///
2343    /// # Examples
2344    ///
2345    /// ```no_run
2346    /// use matrix_sdk::ruma::api::FeatureFlag;
2347    /// # use matrix_sdk::{Client, config::SyncSettings};
2348    /// # use url::Url;
2349    /// # async {
2350    /// # let homeserver = Url::parse("http://localhost:8080")?;
2351    /// # let mut client = Client::new(homeserver).await?;
2352    ///
2353    /// let msc_x_feature = FeatureFlag::from("msc_x");
2354    /// let unstable_features = client.unstable_features().await?;
2355    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2356    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2357    /// # anyhow::Ok(()) };
2358    /// ```
2359    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2360        Ok(self.supported_versions().await?.features)
2361    }
2362
2363    /// Empty the supported versions and unstable features cache.
2364    ///
2365    /// Since the SDK caches the supported versions, it's possible to have a
2366    /// stale entry in the cache. This functions makes it possible to force
2367    /// reset it.
2368    pub async fn reset_supported_versions(&self) -> Result<()> {
2369        // Empty the in-memory caches.
2370        self.inner.caches.supported_versions.write().await.take();
2371
2372        // Empty the store cache.
2373        Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2374    }
2375
2376    /// Load well-known from storage, or fetch it from network and cache it.
2377    async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2378        match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
2379            Ok(Some(stored)) => {
2380                if let Some(well_known) =
2381                    stored.into_well_known().and_then(|value| value.into_data())
2382                {
2383                    return Ok(well_known);
2384                }
2385            }
2386            Ok(None) => {
2387                // fallthrough: cache is empty
2388            }
2389            Err(err) => {
2390                warn!("error when loading cached well-known: {err}");
2391                // fallthrough to network.
2392            }
2393        }
2394
2395        let well_known = self.fetch_client_well_known().await.map(Into::into);
2396
2397        // Attempt to cache the result in storage.
2398        if let Err(err) = self
2399            .state_store()
2400            .set_kv_data(
2401                StateStoreDataKey::WellKnown,
2402                StateStoreDataValue::WellKnown(TtlStoreValue::new(well_known.clone())),
2403            )
2404            .await
2405        {
2406            warn!("error when caching well-known: {err}");
2407        }
2408
2409        Ok(well_known)
2410    }
2411
2412    async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2413        let well_known = &self.inner.caches.well_known;
2414        if let CachedValue::Cached(val) = &*well_known.read().await {
2415            return Ok(val.clone());
2416        }
2417
2418        let mut guarded_well_known = well_known.write().await;
2419        if let CachedValue::Cached(val) = &*guarded_well_known {
2420            return Ok(val.clone());
2421        }
2422
2423        let well_known = self.load_or_fetch_well_known().await?;
2424
2425        *guarded_well_known = CachedValue::Cached(well_known.clone());
2426
2427        Ok(well_known)
2428    }
2429
2430    /// Get information about the homeserver's advertised RTC foci by fetching
2431    /// the well-known file from the server or the cache.
2432    ///
2433    /// # Examples
2434    /// ```no_run
2435    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2436    /// # use url::Url;
2437    /// # async {
2438    /// # let homeserver = Url::parse("http://localhost:8080")?;
2439    /// # let mut client = Client::new(homeserver).await?;
2440    /// let rtc_foci = client.rtc_foci().await?;
2441    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2442    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2443    ///     _ => None,
2444    /// });
2445    /// if let Some(info) = default_livekit_focus_info {
2446    ///     println!("Default LiveKit service URL: {}", info.service_url);
2447    /// }
2448    /// # anyhow::Ok(()) };
2449    /// ```
2450    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2451        let well_known = self.get_or_load_and_cache_well_known().await?;
2452
2453        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2454    }
2455
2456    /// Empty the well-known cache.
2457    ///
2458    /// Since the SDK caches the well-known, it's possible to have a stale entry
2459    /// in the cache. This functions makes it possible to force reset it.
2460    pub async fn reset_well_known(&self) -> Result<()> {
2461        // Empty the in-memory caches.
2462        self.inner.caches.well_known.write().await.take();
2463
2464        // Empty the store cache.
2465        Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2466    }
2467
2468    /// Check whether MSC 4028 is enabled on the homeserver.
2469    ///
2470    /// # Examples
2471    ///
2472    /// ```no_run
2473    /// # use matrix_sdk::{Client, config::SyncSettings};
2474    /// # use url::Url;
2475    /// # async {
2476    /// # let homeserver = Url::parse("http://localhost:8080")?;
2477    /// # let mut client = Client::new(homeserver).await?;
2478    /// let msc4028_enabled =
2479    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2480    /// # anyhow::Ok(()) };
2481    /// ```
2482    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2483        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2484    }
2485
2486    /// Get information of all our own devices.
2487    ///
2488    /// # Examples
2489    ///
2490    /// ```no_run
2491    /// # use matrix_sdk::{Client, config::SyncSettings};
2492    /// # use url::Url;
2493    /// # async {
2494    /// # let homeserver = Url::parse("http://localhost:8080")?;
2495    /// # let mut client = Client::new(homeserver).await?;
2496    /// let response = client.devices().await?;
2497    ///
2498    /// for device in response.devices {
2499    ///     println!(
2500    ///         "Device: {} {}",
2501    ///         device.device_id,
2502    ///         device.display_name.as_deref().unwrap_or("")
2503    ///     );
2504    /// }
2505    /// # anyhow::Ok(()) };
2506    /// ```
2507    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2508        let request = get_devices::v3::Request::new();
2509
2510        self.send(request).await
2511    }
2512
2513    /// Delete the given devices from the server.
2514    ///
2515    /// # Arguments
2516    ///
2517    /// * `devices` - The list of devices that should be deleted from the
2518    ///   server.
2519    ///
2520    /// * `auth_data` - This request requires user interactive auth, the first
2521    ///   request needs to set this to `None` and will always fail with an
2522    ///   `UiaaResponse`. The response will contain information for the
2523    ///   interactive auth and the same request needs to be made but this time
2524    ///   with some `auth_data` provided.
2525    ///
2526    /// ```no_run
2527    /// # use matrix_sdk::{
2528    /// #    ruma::{api::client::uiaa, owned_device_id},
2529    /// #    Client, Error, config::SyncSettings,
2530    /// # };
2531    /// # use serde_json::json;
2532    /// # use url::Url;
2533    /// # use std::collections::BTreeMap;
2534    /// # async {
2535    /// # let homeserver = Url::parse("http://localhost:8080")?;
2536    /// # let mut client = Client::new(homeserver).await?;
2537    /// let devices = &[owned_device_id!("DEVICEID")];
2538    ///
2539    /// if let Err(e) = client.delete_devices(devices, None).await {
2540    ///     if let Some(info) = e.as_uiaa_response() {
2541    ///         let mut password = uiaa::Password::new(
2542    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2543    ///             "wordpass".to_owned(),
2544    ///         );
2545    ///         password.session = info.session.clone();
2546    ///
2547    ///         client
2548    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2549    ///             .await?;
2550    ///     }
2551    /// }
2552    /// # anyhow::Ok(()) };
2553    pub async fn delete_devices(
2554        &self,
2555        devices: &[OwnedDeviceId],
2556        auth_data: Option<uiaa::AuthData>,
2557    ) -> HttpResult<delete_devices::v3::Response> {
2558        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2559        request.auth = auth_data;
2560
2561        self.send(request).await
2562    }
2563
2564    /// Change the display name of a device owned by the current user.
2565    ///
2566    /// Returns a `update_device::Response` which specifies the result
2567    /// of the operation.
2568    ///
2569    /// # Arguments
2570    ///
2571    /// * `device_id` - The ID of the device to change the display name of.
2572    /// * `display_name` - The new display name to set.
2573    pub async fn rename_device(
2574        &self,
2575        device_id: &DeviceId,
2576        display_name: &str,
2577    ) -> HttpResult<update_device::v3::Response> {
2578        let mut request = update_device::v3::Request::new(device_id.to_owned());
2579        request.display_name = Some(display_name.to_owned());
2580
2581        self.send(request).await
2582    }
2583
2584    /// Check whether a device with a specific ID exists on the server.
2585    ///
2586    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2587    /// with 404 and the underlying error otherwise.
2588    ///
2589    /// # Arguments
2590    ///
2591    /// * `device_id` - The ID of the device to query.
2592    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2593        let request = device::get_device::v3::Request::new(device_id);
2594        match self.send(request).await {
2595            Ok(_) => Ok(true),
2596            Err(err) => {
2597                if let Some(error) = err.as_client_api_error()
2598                    && error.status_code == 404
2599                {
2600                    Ok(false)
2601                } else {
2602                    Err(err.into())
2603                }
2604            }
2605        }
2606    }
2607
2608    /// Synchronize the client's state with the latest state on the server.
2609    ///
2610    /// ## Syncing Events
2611    ///
2612    /// Messages or any other type of event need to be periodically fetched from
2613    /// the server, this is achieved by sending a `/sync` request to the server.
2614    ///
2615    /// The first sync is sent out without a [`token`]. The response of the
2616    /// first sync will contain a [`next_batch`] field which should then be
2617    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2618    /// don't receive the same events multiple times.
2619    ///
2620    /// ## Long Polling
2621    ///
2622    /// A sync should in the usual case always be in flight. The
2623    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2624    /// long the server will wait for new events before it will respond.
2625    /// The server will respond immediately if some new events arrive before the
2626    /// timeout has expired. If no changes arrive and the timeout expires an
2627    /// empty sync response will be sent to the client.
2628    ///
2629    /// This method of sending a request that may not receive a response
2630    /// immediately is called long polling.
2631    ///
2632    /// ## Filtering Events
2633    ///
2634    /// The number or type of messages and events that the client should receive
2635    /// from the server can be altered using a [`Filter`].
2636    ///
2637    /// Filters can be non-trivial and, since they will be sent with every sync
2638    /// request, they may take up a bunch of unnecessary bandwidth.
2639    ///
2640    /// Luckily filters can be uploaded to the server and reused using an unique
2641    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2642    /// method.
2643    ///
2644    /// # Arguments
2645    ///
2646    /// * `sync_settings` - Settings for the sync call, this allows us to set
2647    /// various options to configure the sync:
2648    ///     * [`filter`] - To configure which events we receive and which get
2649    ///       [filtered] by the server
2650    ///     * [`timeout`] - To configure our [long polling] setup.
2651    ///     * [`token`] - To tell the server which events we already received
2652    ///       and where we wish to continue syncing.
2653    ///     * [`full_state`] - To tell the server that we wish to receive all
2654    ///       state events, regardless of our configured [`token`].
2655    ///     * [`set_presence`] - To tell the server to set the presence and to
2656    ///       which state.
2657    ///
2658    /// # Examples
2659    ///
2660    /// ```no_run
2661    /// # use url::Url;
2662    /// # async {
2663    /// # let homeserver = Url::parse("http://localhost:8080")?;
2664    /// # let username = "";
2665    /// # let password = "";
2666    /// use matrix_sdk::{
2667    ///     Client, config::SyncSettings,
2668    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2669    /// };
2670    ///
2671    /// let client = Client::new(homeserver).await?;
2672    /// client.matrix_auth().login_username(username, password).send().await?;
2673    ///
2674    /// // Sync once so we receive the client state and old messages.
2675    /// client.sync_once(SyncSettings::default()).await?;
2676    ///
2677    /// // Register our handler so we start responding once we receive a new
2678    /// // event.
2679    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2680    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2681    /// });
2682    ///
2683    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2684    /// // from our `sync_once()` call automatically.
2685    /// client.sync(SyncSettings::default()).await;
2686    /// # anyhow::Ok(()) };
2687    /// ```
2688    ///
2689    /// [`sync`]: #method.sync
2690    /// [`SyncSettings`]: crate::config::SyncSettings
2691    /// [`token`]: crate::config::SyncSettings#method.token
2692    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2693    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2694    /// [`set_presence`]: ruma::presence::PresenceState
2695    /// [`filter`]: crate::config::SyncSettings#method.filter
2696    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2697    /// [`next_batch`]: SyncResponse#structfield.next_batch
2698    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2699    /// [long polling]: #long-polling
2700    /// [filtered]: #filtering-events
2701    #[instrument(skip(self))]
2702    pub async fn sync_once(
2703        &self,
2704        sync_settings: crate::config::SyncSettings,
2705    ) -> Result<SyncResponse> {
2706        // The sync might not return for quite a while due to the timeout.
2707        // We'll see if there's anything crypto related to send out before we
2708        // sync, i.e. if we closed our client after a sync but before the
2709        // crypto requests were sent out.
2710        //
2711        // This will mostly be a no-op.
2712        #[cfg(feature = "e2e-encryption")]
2713        if let Err(e) = self.send_outgoing_requests().await {
2714            error!(error = ?e, "Error while sending outgoing E2EE requests");
2715        }
2716
2717        let token = match sync_settings.token {
2718            SyncToken::Specific(token) => Some(token),
2719            SyncToken::NoToken => None,
2720            SyncToken::ReusePrevious => self.sync_token().await,
2721        };
2722
2723        let request = assign!(sync_events::v3::Request::new(), {
2724            filter: sync_settings.filter.map(|f| *f),
2725            since: token,
2726            full_state: sync_settings.full_state,
2727            set_presence: sync_settings.set_presence,
2728            timeout: sync_settings.timeout,
2729            use_state_after: true,
2730        });
2731        let mut request_config = self.request_config();
2732        if let Some(timeout) = sync_settings.timeout {
2733            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2734            request_config.timeout = Some(base_timeout + timeout);
2735        }
2736
2737        let response = self.send(request).with_request_config(request_config).await?;
2738        let next_batch = response.next_batch.clone();
2739        let response = self.process_sync(response).await?;
2740
2741        #[cfg(feature = "e2e-encryption")]
2742        if let Err(e) = self.send_outgoing_requests().await {
2743            error!(error = ?e, "Error while sending outgoing E2EE requests");
2744        }
2745
2746        self.inner.sync_beat.notify(usize::MAX);
2747
2748        Ok(SyncResponse::new(next_batch, response))
2749    }
2750
2751    /// Repeatedly synchronize the client state with the server.
2752    ///
2753    /// This method will only return on error, if cancellation is needed
2754    /// the method should be wrapped in a cancelable task or the
2755    /// [`Client::sync_with_callback`] method can be used or
2756    /// [`Client::sync_with_result_callback`] if you want to handle error
2757    /// cases in the loop, too.
2758    ///
2759    /// This method will internally call [`Client::sync_once`] in a loop.
2760    ///
2761    /// This method can be used with the [`Client::add_event_handler`]
2762    /// method to react to individual events. If you instead wish to handle
2763    /// events in a bulk manner the [`Client::sync_with_callback`],
2764    /// [`Client::sync_with_result_callback`] and
2765    /// [`Client::sync_stream`] methods can be used instead. Those methods
2766    /// repeatedly return the whole sync response.
2767    ///
2768    /// # Arguments
2769    ///
2770    /// * `sync_settings` - Settings for the sync call. *Note* that those
2771    ///   settings will be only used for the first sync call. See the argument
2772    ///   docs for [`Client::sync_once`] for more info.
2773    ///
2774    /// # Return
2775    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2776    /// up to the user of the API to check the error and decide whether the sync
2777    /// should continue or not.
2778    ///
2779    /// # Examples
2780    ///
2781    /// ```no_run
2782    /// # use url::Url;
2783    /// # async {
2784    /// # let homeserver = Url::parse("http://localhost:8080")?;
2785    /// # let username = "";
2786    /// # let password = "";
2787    /// use matrix_sdk::{
2788    ///     Client, config::SyncSettings,
2789    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2790    /// };
2791    ///
2792    /// let client = Client::new(homeserver).await?;
2793    /// client.matrix_auth().login_username(&username, &password).send().await?;
2794    ///
2795    /// // Register our handler so we start responding once we receive a new
2796    /// // event.
2797    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2798    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2799    /// });
2800    ///
2801    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2802    /// // automatically.
2803    /// client.sync(SyncSettings::default()).await?;
2804    /// # anyhow::Ok(()) };
2805    /// ```
2806    ///
2807    /// [argument docs]: #method.sync_once
2808    /// [`sync_with_callback`]: #method.sync_with_callback
2809    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2810        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2811    }
2812
2813    /// Repeatedly call sync to synchronize the client state with the server.
2814    ///
2815    /// # Arguments
2816    ///
2817    /// * `sync_settings` - Settings for the sync call. *Note* that those
2818    ///   settings will be only used for the first sync call. See the argument
2819    ///   docs for [`Client::sync_once`] for more info.
2820    ///
2821    /// * `callback` - A callback that will be called every time a successful
2822    ///   response has been fetched from the server. The callback must return a
2823    ///   boolean which signalizes if the method should stop syncing. If the
2824    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2825    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2826    ///
2827    /// # Return
2828    /// The sync runs until an error occurs or the
2829    /// callback indicates that the Loop should stop. If the callback asked for
2830    /// a regular stop, the result will be `Ok(())` otherwise the
2831    /// `Err(Error)` is returned.
2832    ///
2833    /// # Examples
2834    ///
2835    /// The following example demonstrates how to sync forever while sending all
2836    /// the interesting events through a mpsc channel to another thread e.g. a
2837    /// UI thread.
2838    ///
2839    /// ```no_run
2840    /// # use std::time::Duration;
2841    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2842    /// # use url::Url;
2843    /// # async {
2844    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2845    /// # let mut client = Client::new(homeserver).await.unwrap();
2846    ///
2847    /// use tokio::sync::mpsc::channel;
2848    ///
2849    /// let (tx, rx) = channel(100);
2850    ///
2851    /// let sync_channel = &tx;
2852    /// let sync_settings = SyncSettings::new()
2853    ///     .timeout(Duration::from_secs(30));
2854    ///
2855    /// client
2856    ///     .sync_with_callback(sync_settings, |response| async move {
2857    ///         let channel = sync_channel;
2858    ///         for (room_id, room) in response.rooms.joined {
2859    ///             for event in room.timeline.events {
2860    ///                 channel.send(event).await.unwrap();
2861    ///             }
2862    ///         }
2863    ///
2864    ///         LoopCtrl::Continue
2865    ///     })
2866    ///     .await;
2867    /// };
2868    /// ```
2869    #[instrument(skip_all)]
2870    pub async fn sync_with_callback<C>(
2871        &self,
2872        sync_settings: crate::config::SyncSettings,
2873        callback: impl Fn(SyncResponse) -> C,
2874    ) -> Result<(), Error>
2875    where
2876        C: Future<Output = LoopCtrl>,
2877    {
2878        self.sync_with_result_callback(sync_settings, |result| async {
2879            Ok(callback(result?).await)
2880        })
2881        .await
2882    }
2883
2884    /// Repeatedly call sync to synchronize the client state with the server.
2885    ///
2886    /// # Arguments
2887    ///
2888    /// * `sync_settings` - Settings for the sync call. *Note* that those
2889    ///   settings will be only used for the first sync call. See the argument
2890    ///   docs for [`Client::sync_once`] for more info.
2891    ///
2892    /// * `callback` - A callback that will be called every time after a
2893    ///   response has been received, failure or not. The callback returns a
2894    ///   `Result<LoopCtrl, Error>`, too. When returning
2895    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2896    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2897    ///   function returns `Ok(())`. In case the callback can't handle the
2898    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2899    ///   which results in the sync ending and the `Err(Error)` being returned.
2900    ///
2901    /// # Return
2902    /// The sync runs until an error occurs that the callback can't handle or
2903    /// the callback indicates that the Loop should stop. If the callback
2904    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2905    /// `Err(Error)` is returned.
2906    ///
2907    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2908    /// this, and are handled first without sending the result to the
2909    /// callback. Only after they have exceeded is the `Result` handed to
2910    /// the callback.
2911    ///
2912    /// # Examples
2913    ///
2914    /// The following example demonstrates how to sync forever while sending all
2915    /// the interesting events through a mpsc channel to another thread e.g. a
2916    /// UI thread.
2917    ///
2918    /// ```no_run
2919    /// # use std::time::Duration;
2920    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2921    /// # use url::Url;
2922    /// # async {
2923    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2924    /// # let mut client = Client::new(homeserver).await.unwrap();
2925    /// #
2926    /// use tokio::sync::mpsc::channel;
2927    ///
2928    /// let (tx, rx) = channel(100);
2929    ///
2930    /// let sync_channel = &tx;
2931    /// let sync_settings = SyncSettings::new()
2932    ///     .timeout(Duration::from_secs(30));
2933    ///
2934    /// client
2935    ///     .sync_with_result_callback(sync_settings, |response| async move {
2936    ///         let channel = sync_channel;
2937    ///         let sync_response = response?;
2938    ///         for (room_id, room) in sync_response.rooms.joined {
2939    ///              for event in room.timeline.events {
2940    ///                  channel.send(event).await.unwrap();
2941    ///               }
2942    ///         }
2943    ///
2944    ///         Ok(LoopCtrl::Continue)
2945    ///     })
2946    ///     .await;
2947    /// };
2948    /// ```
2949    #[instrument(skip(self, callback))]
2950    pub async fn sync_with_result_callback<C>(
2951        &self,
2952        sync_settings: crate::config::SyncSettings,
2953        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2954    ) -> Result<(), Error>
2955    where
2956        C: Future<Output = Result<LoopCtrl, Error>>,
2957    {
2958        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2959
2960        while let Some(result) = sync_stream.next().await {
2961            trace!("Running callback");
2962            if callback(result).await? == LoopCtrl::Break {
2963                trace!("Callback told us to stop");
2964                break;
2965            }
2966            trace!("Done running callback");
2967        }
2968
2969        Ok(())
2970    }
2971
2972    //// Repeatedly synchronize the client state with the server.
2973    ///
2974    /// This method will internally call [`Client::sync_once`] in a loop and is
2975    /// equivalent to the [`Client::sync`] method but the responses are provided
2976    /// as an async stream.
2977    ///
2978    /// # Arguments
2979    ///
2980    /// * `sync_settings` - Settings for the sync call. *Note* that those
2981    ///   settings will be only used for the first sync call. See the argument
2982    ///   docs for [`Client::sync_once`] for more info.
2983    ///
2984    /// # Examples
2985    ///
2986    /// ```no_run
2987    /// # use url::Url;
2988    /// # async {
2989    /// # let homeserver = Url::parse("http://localhost:8080")?;
2990    /// # let username = "";
2991    /// # let password = "";
2992    /// use futures_util::StreamExt;
2993    /// use matrix_sdk::{Client, config::SyncSettings};
2994    ///
2995    /// let client = Client::new(homeserver).await?;
2996    /// client.matrix_auth().login_username(&username, &password).send().await?;
2997    ///
2998    /// let mut sync_stream =
2999    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
3000    ///
3001    /// while let Some(Ok(response)) = sync_stream.next().await {
3002    ///     for room in response.rooms.joined.values() {
3003    ///         for e in &room.timeline.events {
3004    ///             if let Ok(event) = e.raw().deserialize() {
3005    ///                 println!("Received event {:?}", event);
3006    ///             }
3007    ///         }
3008    ///     }
3009    /// }
3010    ///
3011    /// # anyhow::Ok(()) };
3012    /// ```
3013    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3014    #[instrument(skip(self))]
3015    pub async fn sync_stream(
3016        &self,
3017        mut sync_settings: crate::config::SyncSettings,
3018    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3019        let mut is_first_sync = true;
3020        let mut timeout = None;
3021        let mut last_sync_time: Option<Instant> = None;
3022
3023        let parent_span = Span::current();
3024
3025        async_stream::stream!({
3026            loop {
3027                trace!("Syncing");
3028
3029                if sync_settings.ignore_timeout_on_first_sync {
3030                    if is_first_sync {
3031                        timeout = sync_settings.timeout.take();
3032                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
3033                        sync_settings.timeout = timeout.take();
3034                    }
3035
3036                    is_first_sync = false;
3037                }
3038
3039                yield self
3040                    .sync_loop_helper(&mut sync_settings)
3041                    .instrument(parent_span.clone())
3042                    .await;
3043
3044                Client::delay_sync(&mut last_sync_time).await
3045            }
3046        })
3047    }
3048
3049    /// Get the current, if any, sync token of the client.
3050    /// This will be None if the client didn't sync at least once.
3051    pub(crate) async fn sync_token(&self) -> Option<String> {
3052        self.inner.base_client.sync_token().await
3053    }
3054
3055    /// Gets information about the owner of a given access token.
3056    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3057        let request = whoami::v3::Request::new();
3058        self.send(request).await
3059    }
3060
3061    /// Subscribes a new receiver to client SessionChange broadcasts.
3062    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3063        let broadcast = &self.auth_ctx().session_change_sender;
3064        broadcast.subscribe()
3065    }
3066
3067    /// Sets the save/restore session callbacks.
3068    ///
3069    /// This is another mechanism to get synchronous updates to session tokens,
3070    /// while [`Self::subscribe_to_session_changes`] provides an async update.
3071    pub fn set_session_callbacks(
3072        &self,
3073        reload_session_callback: Box<ReloadSessionCallback>,
3074        save_session_callback: Box<SaveSessionCallback>,
3075    ) -> Result<()> {
3076        self.inner
3077            .auth_ctx
3078            .reload_session_callback
3079            .set(reload_session_callback)
3080            .map_err(|_| Error::MultipleSessionCallbacks)?;
3081
3082        self.inner
3083            .auth_ctx
3084            .save_session_callback
3085            .set(save_session_callback)
3086            .map_err(|_| Error::MultipleSessionCallbacks)?;
3087
3088        Ok(())
3089    }
3090
3091    /// Get the notification settings of the current owner of the client.
3092    pub async fn notification_settings(&self) -> NotificationSettings {
3093        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3094        NotificationSettings::new(self.clone(), ruleset)
3095    }
3096
3097    /// Create a new specialized `Client` that can process notifications.
3098    ///
3099    /// See [`CrossProcessLock::new`] to learn more about
3100    /// `cross_process_lock_config`.
3101    ///
3102    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3103    pub async fn notification_client(
3104        &self,
3105        cross_process_lock_config: CrossProcessLockConfig,
3106    ) -> Result<Client> {
3107        let client = Client {
3108            inner: ClientInner::new(
3109                self.inner.auth_ctx.clone(),
3110                self.server().cloned(),
3111                self.homeserver(),
3112                self.sliding_sync_version(),
3113                self.inner.http_client.clone(),
3114                self.inner
3115                    .base_client
3116                    .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3117                    .await?,
3118                self.inner.caches.supported_versions.read().await.clone(),
3119                self.inner.caches.well_known.read().await.clone(),
3120                self.inner.respect_login_well_known,
3121                self.inner.event_cache.clone(),
3122                self.inner.send_queue_data.clone(),
3123                self.inner.latest_events.clone(),
3124                #[cfg(feature = "e2e-encryption")]
3125                self.inner.e2ee.encryption_settings,
3126                #[cfg(feature = "e2e-encryption")]
3127                self.inner.enable_share_history_on_invite,
3128                cross_process_lock_config,
3129                #[cfg(feature = "experimental-search")]
3130                self.inner.search_index.clone(),
3131                self.inner.thread_subscription_catchup.clone(),
3132            )
3133            .await,
3134        };
3135
3136        Ok(client)
3137    }
3138
3139    /// The [`EventCache`] instance for this [`Client`].
3140    pub fn event_cache(&self) -> &EventCache {
3141        // SAFETY: always initialized in the `Client` ctor.
3142        self.inner.event_cache.get().unwrap()
3143    }
3144
3145    /// The [`LatestEvents`] instance for this [`Client`].
3146    pub async fn latest_events(&self) -> &LatestEvents {
3147        self.inner
3148            .latest_events
3149            .get_or_init(|| async {
3150                LatestEvents::new(
3151                    WeakClient::from_client(self),
3152                    self.event_cache().clone(),
3153                    SendQueue::new(self.clone()),
3154                    self.room_info_notable_update_receiver(),
3155                )
3156            })
3157            .await
3158    }
3159
3160    /// Waits until an at least partially synced room is received, and returns
3161    /// it.
3162    ///
3163    /// **Note: this function will loop endlessly until either it finds the room
3164    /// or an externally set timeout happens.**
3165    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3166        loop {
3167            if let Some(room) = self.get_room(room_id) {
3168                if room.is_state_partially_or_fully_synced() {
3169                    debug!("Found just created room!");
3170                    return room;
3171                }
3172                debug!("Room wasn't partially synced, waiting for sync beat to try again");
3173            } else {
3174                debug!("Room wasn't found, waiting for sync beat to try again");
3175            }
3176            self.inner.sync_beat.listen().await;
3177        }
3178    }
3179
3180    /// Knock on a room given its `room_id_or_alias` to ask for permission to
3181    /// join it.
3182    pub async fn knock(
3183        &self,
3184        room_id_or_alias: OwnedRoomOrAliasId,
3185        reason: Option<String>,
3186        server_names: Vec<OwnedServerName>,
3187    ) -> Result<Room> {
3188        let request =
3189            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3190        let response = self.send(request).await?;
3191        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3192        Ok(Room::new(self.clone(), base_room))
3193    }
3194
3195    /// Checks whether the provided `user_id` belongs to an ignored user.
3196    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3197        self.base_client().is_user_ignored(user_id).await
3198    }
3199
3200    /// Gets the `max_upload_size` value from the homeserver, getting either a
3201    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3202    /// missing.
3203    ///
3204    /// Check the spec for more info:
3205    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3206    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3207        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3208        if let Some(data) = max_upload_size_lock.get() {
3209            return Ok(data.to_owned());
3210        }
3211
3212        // Use the authenticated endpoint when the server supports it.
3213        let supported_versions = self.supported_versions().await?;
3214        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3215            .is_supported(&supported_versions);
3216
3217        let upload_size = if use_auth {
3218            self.send(authenticated_media::get_media_config::v1::Request::default())
3219                .await?
3220                .upload_size
3221        } else {
3222            #[allow(deprecated)]
3223            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3224        };
3225
3226        match max_upload_size_lock.set(upload_size) {
3227            Ok(_) => Ok(upload_size),
3228            Err(error) => {
3229                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3230            }
3231        }
3232    }
3233
3234    /// The settings to use for decrypting events.
3235    #[cfg(feature = "e2e-encryption")]
3236    pub fn decryption_settings(&self) -> &DecryptionSettings {
3237        &self.base_client().decryption_settings
3238    }
3239
3240    /// Returns the [`SearchIndex`] for this [`Client`].
3241    #[cfg(feature = "experimental-search")]
3242    pub fn search_index(&self) -> &SearchIndex {
3243        &self.inner.search_index
3244    }
3245
3246    /// Whether the client is configured to take thread subscriptions (MSC4306
3247    /// and MSC4308) into account.
3248    ///
3249    /// This may cause filtering out of thread subscriptions, and loading the
3250    /// thread subscriptions via the sliding sync extension, when the room
3251    /// list service is being used.
3252    pub fn enabled_thread_subscriptions(&self) -> bool {
3253        match self.base_client().threading_support {
3254            ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
3255            ThreadingSupport::Disabled => false,
3256        }
3257    }
3258
3259    /// Fetch thread subscriptions changes between `from` and up to `to`.
3260    ///
3261    /// The `limit` optional parameter can be used to limit the number of
3262    /// entries in a response. It can also be overridden by the server, if
3263    /// it's deemed too large.
3264    pub async fn fetch_thread_subscriptions(
3265        &self,
3266        from: Option<String>,
3267        to: Option<String>,
3268        limit: Option<UInt>,
3269    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3270        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3271            from,
3272            to,
3273            limit,
3274        });
3275        Ok(self.send(request).await?)
3276    }
3277
3278    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3279        self.inner.thread_subscription_catchup.get().unwrap()
3280    }
3281
3282    /// Perform database optimizations if any are available, i.e. vacuuming in
3283    /// SQLite.
3284    ///
3285    /// **Warning:** this was added to check if SQLite fragmentation was the
3286    /// source of performance issues, **DO NOT use in production**.
3287    #[doc(hidden)]
3288    pub async fn optimize_stores(&self) -> Result<()> {
3289        trace!("Optimizing state store...");
3290        self.state_store().optimize().await?;
3291
3292        trace!("Optimizing event cache store...");
3293        if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3294            clean_lock.optimize().await?;
3295        }
3296
3297        trace!("Optimizing media store...");
3298        self.media_store().lock().await?.optimize().await?;
3299
3300        Ok(())
3301    }
3302
3303    /// Returns the sizes of the existing stores, if known.
3304    pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3305        #[cfg(feature = "e2e-encryption")]
3306        let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3307            && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3308        {
3309            Some(store_size)
3310        } else {
3311            None
3312        };
3313        #[cfg(not(feature = "e2e-encryption"))]
3314        let crypto_store_size = None;
3315
3316        let state_store_size = self.state_store().get_size().await.ok().flatten();
3317
3318        let event_cache_store_size = if let Some(clean_lock) =
3319            self.event_cache_store().lock().await?.as_clean()
3320            && let Ok(Some(store_size)) = clean_lock.get_size().await
3321        {
3322            Some(store_size)
3323        } else {
3324            None
3325        };
3326
3327        let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3328
3329        Ok(StoreSizes {
3330            crypto_store: crypto_store_size,
3331            state_store: state_store_size,
3332            event_cache_store: event_cache_store_size,
3333            media_store: media_store_size,
3334        })
3335    }
3336
3337    /// Get a reference to the client's task monitor, for spawning background
3338    /// tasks.
3339    pub fn task_monitor(&self) -> &TaskMonitor {
3340        &self.inner.task_monitor
3341    }
3342
3343    /// Add a subscriber for duplicate key upload error notifications triggered
3344    /// by requests to /keys/upload.
3345    #[cfg(feature = "e2e-encryption")]
3346    pub fn subscribe_to_duplicate_key_upload_errors(
3347        &self,
3348    ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3349        self.inner.duplicate_key_upload_error_sender.subscribe()
3350    }
3351
3352    /// Check the record of whether we are waiting for an [MSC4268] key bundle
3353    /// for the given room.
3354    ///
3355    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3356    #[cfg(feature = "e2e-encryption")]
3357    pub async fn get_pending_key_bundle_details_for_room(
3358        &self,
3359        room_id: &RoomId,
3360    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3361        Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3362    }
3363}
3364
3365/// Contains the disk size of the different stores, if known. It won't be
3366/// available for in-memory stores.
3367#[derive(Debug, Clone)]
3368pub struct StoreSizes {
3369    /// The size of the CryptoStore.
3370    pub crypto_store: Option<usize>,
3371    /// The size of the StateStore.
3372    pub state_store: Option<usize>,
3373    /// The size of the EventCacheStore.
3374    pub event_cache_store: Option<usize>,
3375    /// The size of the MediaStore.
3376    pub media_store: Option<usize>,
3377}
3378
3379#[cfg(any(feature = "testing", test))]
3380impl Client {
3381    /// Test helper to mark users as tracked by the crypto layer.
3382    #[cfg(feature = "e2e-encryption")]
3383    pub async fn update_tracked_users_for_testing(
3384        &self,
3385        user_ids: impl IntoIterator<Item = &UserId>,
3386    ) {
3387        let olm = self.olm_machine().await;
3388        let olm = olm.as_ref().unwrap();
3389        olm.update_tracked_users(user_ids).await.unwrap();
3390    }
3391}
3392
3393/// A weak reference to the inner client, useful when trying to get a handle
3394/// on the owning client.
3395#[derive(Clone, Debug)]
3396pub(crate) struct WeakClient {
3397    client: Weak<ClientInner>,
3398}
3399
3400impl WeakClient {
3401    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3402    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3403        Self { client: Arc::downgrade(client) }
3404    }
3405
3406    /// Construct a [`WeakClient`] from a [`Client`].
3407    pub fn from_client(client: &Client) -> Self {
3408        Self::from_inner(&client.inner)
3409    }
3410
3411    /// Attempts to get a [`Client`] from this [`WeakClient`].
3412    pub fn get(&self) -> Option<Client> {
3413        self.client.upgrade().map(|inner| Client { inner })
3414    }
3415
3416    /// Gets the number of strong (`Arc`) pointers still pointing to this
3417    /// client.
3418    #[allow(dead_code)]
3419    pub fn strong_count(&self) -> usize {
3420        self.client.strong_count()
3421    }
3422}
3423
3424/// Information about the state of a room before we joined it.
3425#[derive(Debug, Clone, Default)]
3426struct PreJoinRoomInfo {
3427    /// The user who invited us to the room, if any.
3428    pub inviter: Option<RoomMember>,
3429}
3430
3431// The http mocking library is not supported for wasm32
3432#[cfg(all(test, not(target_family = "wasm")))]
3433pub(crate) mod tests {
3434    use std::{sync::Arc, time::Duration};
3435
3436    use assert_matches::assert_matches;
3437    use assert_matches2::assert_let;
3438    use eyeball::SharedObservable;
3439    use futures_util::{FutureExt, StreamExt, pin_mut};
3440    use js_int::{UInt, uint};
3441    use matrix_sdk_base::{
3442        RoomState,
3443        store::{MemoryStore, StoreConfig},
3444    };
3445    use matrix_sdk_test::{
3446        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3447        event_factory::EventFactory,
3448    };
3449    #[cfg(target_family = "wasm")]
3450    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3451
3452    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3453    use ruma::{
3454        RoomId, ServerName, UserId,
3455        api::{
3456            FeatureFlag, MatrixVersion,
3457            client::{
3458                discovery::discover_homeserver::RtcFocusInfo,
3459                room::create_room::v3::Request as CreateRoomRequest,
3460            },
3461        },
3462        assign,
3463        events::{
3464            ignored_user_list::IgnoredUserListEventContent,
3465            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3466        },
3467        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3468    };
3469    use serde_json::json;
3470    use stream_assert::{assert_next_matches, assert_pending};
3471    use tokio::{
3472        spawn,
3473        time::{sleep, timeout},
3474    };
3475    use url::Url;
3476
3477    use super::Client;
3478    use crate::{
3479        Error, TransmissionProgress,
3480        client::{WeakClient, futures::SendMediaUploadRequest},
3481        config::{RequestConfig, SyncSettings},
3482        futures::SendRequest,
3483        media::MediaError,
3484        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3485    };
3486
3487    #[async_test]
3488    async fn test_account_data() {
3489        let server = MatrixMockServer::new().await;
3490        let client = server.client_builder().build().await;
3491
3492        let f = EventFactory::new();
3493        server
3494            .mock_sync()
3495            .ok_and_run(&client, |builder| {
3496                builder.add_global_account_data(
3497                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3498                );
3499            })
3500            .await;
3501
3502        let content = client
3503            .account()
3504            .account_data::<IgnoredUserListEventContent>()
3505            .await
3506            .unwrap()
3507            .unwrap()
3508            .deserialize()
3509            .unwrap();
3510
3511        assert_eq!(content.ignored_users.len(), 1);
3512    }
3513
3514    #[async_test]
3515    async fn test_successful_discovery() {
3516        // Imagine this is `matrix.org`.
3517        let server = MatrixMockServer::new().await;
3518        let server_url = server.uri();
3519
3520        // Imagine this is `matrix-client.matrix.org`.
3521        let homeserver = MatrixMockServer::new().await;
3522        let homeserver_url = homeserver.uri();
3523
3524        // Imagine Alice has the user ID `@alice:matrix.org`.
3525        let domain = server_url.strip_prefix("http://").unwrap();
3526        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3527
3528        // The `.well-known` is on the server (e.g. `matrix.org`).
3529        server
3530            .mock_well_known()
3531            .ok_with_homeserver_url(&homeserver_url)
3532            .mock_once()
3533            .named("well-known")
3534            .mount()
3535            .await;
3536
3537        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3538        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3539
3540        let client = Client::builder()
3541            .insecure_server_name_no_tls(alice.server_name())
3542            .build()
3543            .await
3544            .unwrap();
3545
3546        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3547        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3548        client.server_versions().await.unwrap();
3549    }
3550
3551    #[async_test]
3552    async fn test_discovery_broken_server() {
3553        let server = MatrixMockServer::new().await;
3554        let server_url = server.uri();
3555        let domain = server_url.strip_prefix("http://").unwrap();
3556        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3557
3558        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3559
3560        assert!(
3561            Client::builder()
3562                .insecure_server_name_no_tls(alice.server_name())
3563                .build()
3564                .await
3565                .is_err(),
3566            "Creating a client from a user ID should fail when the .well-known request fails."
3567        );
3568    }
3569
3570    #[async_test]
3571    async fn test_room_creation() {
3572        let server = MatrixMockServer::new().await;
3573        let client = server.client_builder().build().await;
3574
3575        let f = EventFactory::new().sender(user_id!("@example:localhost"));
3576        server
3577            .mock_sync()
3578            .ok_and_run(&client, |builder| {
3579                builder.add_joined_room(
3580                    JoinedRoomBuilder::default()
3581                        .add_state_event(
3582                            f.member(user_id!("@example:localhost")).display_name("example"),
3583                        )
3584                        .add_state_event(f.default_power_levels()),
3585                );
3586            })
3587            .await;
3588
3589        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3590        assert_eq!(room.state(), RoomState::Joined);
3591    }
3592
3593    #[async_test]
3594    async fn test_retry_limit_http_requests() {
3595        let server = MatrixMockServer::new().await;
3596        let client = server
3597            .client_builder()
3598            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3599            .build()
3600            .await;
3601
3602        assert!(client.request_config().retry_limit.unwrap() == 4);
3603
3604        server.mock_who_am_i().error500().expect(4).mount().await;
3605
3606        client.whoami().await.unwrap_err();
3607    }
3608
3609    #[async_test]
3610    async fn test_retry_timeout_http_requests() {
3611        // Keep this timeout small so that the test doesn't take long
3612        let retry_timeout = Duration::from_secs(5);
3613        let server = MatrixMockServer::new().await;
3614        let client = server
3615            .client_builder()
3616            .on_builder(|builder| {
3617                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3618            })
3619            .build()
3620            .await;
3621
3622        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3623
3624        server.mock_login().error500().expect(2..).mount().await;
3625
3626        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3627    }
3628
3629    #[async_test]
3630    async fn test_short_retry_initial_http_requests() {
3631        let server = MatrixMockServer::new().await;
3632        let client = server
3633            .client_builder()
3634            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3635            .build()
3636            .await;
3637
3638        server.mock_login().error500().expect(3..).mount().await;
3639
3640        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3641    }
3642
3643    #[async_test]
3644    async fn test_no_retry_http_requests() {
3645        let server = MatrixMockServer::new().await;
3646        let client = server.client_builder().build().await;
3647
3648        server.mock_devices().error500().mock_once().mount().await;
3649
3650        client.devices().await.unwrap_err();
3651    }
3652
3653    #[async_test]
3654    async fn test_set_homeserver() {
3655        let client = MockClientBuilder::new(None).build().await;
3656        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3657
3658        let homeserver = Url::parse("http://example.com/").unwrap();
3659        client.set_homeserver(homeserver.clone());
3660        assert_eq!(client.homeserver(), homeserver);
3661    }
3662
3663    #[async_test]
3664    async fn test_search_user_request() {
3665        let server = MatrixMockServer::new().await;
3666        let client = server.client_builder().build().await;
3667
3668        server.mock_user_directory().ok().mock_once().mount().await;
3669
3670        let response = client.search_users("test", 50).await.unwrap();
3671        assert_eq!(response.results.len(), 1);
3672        let result = &response.results[0];
3673        assert_eq!(result.user_id.to_string(), "@test:example.me");
3674        assert_eq!(result.display_name.clone().unwrap(), "Test");
3675        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3676        assert!(!response.limited);
3677    }
3678
3679    #[async_test]
3680    async fn test_request_unstable_features() {
3681        let server = MatrixMockServer::new().await;
3682        let client = server.client_builder().no_server_versions().build().await;
3683
3684        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3685
3686        let unstable_features = client.unstable_features().await.unwrap();
3687        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3688        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3689    }
3690
3691    #[async_test]
3692    async fn test_can_homeserver_push_encrypted_event_to_device() {
3693        let server = MatrixMockServer::new().await;
3694        let client = server.client_builder().no_server_versions().build().await;
3695
3696        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3697
3698        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3699        assert!(msc4028_enabled);
3700    }
3701
3702    #[async_test]
3703    async fn test_recently_visited_rooms() {
3704        // Tracking recently visited rooms requires authentication
3705        let client = MockClientBuilder::new(None).unlogged().build().await;
3706        assert_matches!(
3707            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3708            Err(Error::AuthenticationRequired)
3709        );
3710
3711        let client = MockClientBuilder::new(None).build().await;
3712        let account = client.account();
3713
3714        // We should start off with an empty list
3715        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3716
3717        // Tracking a valid room id should add it to the list
3718        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3719        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3720        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3721
3722        // And the existing list shouldn't be changed
3723        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3724        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3725
3726        // Tracking the same room again shouldn't change the list
3727        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3728        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3729        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3730
3731        // Tracking a second room should add it to the front of the list
3732        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3733        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3734        assert_eq!(
3735            account.get_recently_visited_rooms().await.unwrap(),
3736            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3737        );
3738
3739        // Tracking the first room yet again should move it to the front of the list
3740        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3741        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3742        assert_eq!(
3743            account.get_recently_visited_rooms().await.unwrap(),
3744            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3745        );
3746
3747        // Tracking should be capped at 20
3748        for n in 0..20 {
3749            account
3750                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3751                .await
3752                .unwrap();
3753        }
3754
3755        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3756
3757        // And the initial rooms should've been pushed out
3758        let rooms = account.get_recently_visited_rooms().await.unwrap();
3759        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3760        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3761
3762        // And the last tracked room should be the first
3763        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3764    }
3765
3766    #[async_test]
3767    async fn test_client_no_cycle_with_event_cache() {
3768        let client = MockClientBuilder::new(None).build().await;
3769
3770        // Wait for the init tasks to die.
3771        sleep(Duration::from_secs(1)).await;
3772
3773        let weak_client = WeakClient::from_client(&client);
3774        assert_eq!(weak_client.strong_count(), 1);
3775
3776        {
3777            let room_id = room_id!("!room:example.org");
3778
3779            // Have the client know the room.
3780            let response = SyncResponseBuilder::default()
3781                .add_joined_room(JoinedRoomBuilder::new(room_id))
3782                .build_sync_response();
3783            client.inner.base_client.receive_sync_response(response).await.unwrap();
3784
3785            client.event_cache().subscribe().unwrap();
3786
3787            let (_room_event_cache, _drop_handles) =
3788                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3789        }
3790
3791        drop(client);
3792
3793        // Give a bit of time for background tasks to die.
3794        sleep(Duration::from_secs(1)).await;
3795
3796        // The weak client must be the last reference to the client now.
3797        assert_eq!(weak_client.strong_count(), 0);
3798        let client = weak_client.get();
3799        assert!(
3800            client.is_none(),
3801            "too many strong references to the client: {}",
3802            Arc::strong_count(&client.unwrap().inner)
3803        );
3804    }
3805
3806    #[async_test]
3807    async fn test_supported_versions_caching() {
3808        let server = MatrixMockServer::new().await;
3809
3810        let versions_mock = server
3811            .mock_versions()
3812            .expect_default_access_token()
3813            .ok_with_unstable_features()
3814            .named("first versions mock")
3815            .expect(1)
3816            .mount_as_scoped()
3817            .await;
3818
3819        let memory_store = Arc::new(MemoryStore::new());
3820        let client = server
3821            .client_builder()
3822            .no_server_versions()
3823            .on_builder(|builder| {
3824                builder.store_config(
3825                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3826                        .state_store(memory_store.clone()),
3827                )
3828            })
3829            .build()
3830            .await;
3831
3832        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3833
3834        // The result was cached.
3835        assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3836        // This subsequent call hits the in-memory cache.
3837        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3838
3839        drop(client);
3840
3841        let client = server
3842            .client_builder()
3843            .no_server_versions()
3844            .on_builder(|builder| {
3845                builder.store_config(
3846                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3847                        .state_store(memory_store.clone()),
3848                )
3849            })
3850            .build()
3851            .await;
3852
3853        // These calls to the new client hit the on-disk cache.
3854        assert!(
3855            client
3856                .unstable_features()
3857                .await
3858                .unwrap()
3859                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3860        );
3861        let supported = client.supported_versions().await.unwrap();
3862        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3863        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3864
3865        // Then this call hits the in-memory cache.
3866        let supported = client.supported_versions().await.unwrap();
3867        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3868        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3869
3870        drop(versions_mock);
3871
3872        // Now, reset the cache, and observe the endpoints being called again once.
3873        client.reset_supported_versions().await.unwrap();
3874
3875        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3876
3877        // Hits network again.
3878        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3879        // Hits in-memory cache again.
3880        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3881    }
3882
3883    #[async_test]
3884    async fn test_well_known_caching() {
3885        let server = MatrixMockServer::new().await;
3886        let server_url = server.uri();
3887        let domain = server_url.strip_prefix("http://").unwrap();
3888        let server_name = <&ServerName>::try_from(domain).unwrap();
3889        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3890
3891        let well_known_mock = server
3892            .mock_well_known()
3893            .ok()
3894            .named("well known mock")
3895            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3896            .mount_as_scoped()
3897            .await;
3898
3899        let memory_store = Arc::new(MemoryStore::new());
3900        let client = Client::builder()
3901            .insecure_server_name_no_tls(server_name)
3902            .store_config(
3903                StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3904                    .state_store(memory_store.clone()),
3905            )
3906            .build()
3907            .await
3908            .unwrap();
3909
3910        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3911
3912        // This subsequent call hits the in-memory cache.
3913        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3914
3915        drop(client);
3916
3917        let client = server
3918            .client_builder()
3919            .no_server_versions()
3920            .on_builder(|builder| {
3921                builder.store_config(
3922                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3923                        .state_store(memory_store.clone()),
3924                )
3925            })
3926            .build()
3927            .await;
3928
3929        // This call to the new client hits the on-disk cache.
3930        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3931
3932        // Then this call hits the in-memory cache.
3933        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3934
3935        drop(well_known_mock);
3936
3937        // Now, reset the cache, and observe the endpoints being called again once.
3938        client.reset_well_known().await.unwrap();
3939
3940        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3941
3942        // Hits network again.
3943        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3944        // Hits in-memory cache again.
3945        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3946    }
3947
3948    #[async_test]
3949    async fn test_missing_well_known_caching() {
3950        let server = MatrixMockServer::new().await;
3951        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3952
3953        let well_known_mock = server
3954            .mock_well_known()
3955            .error_unrecognized()
3956            .named("first well-known mock")
3957            .expect(1)
3958            .mount_as_scoped()
3959            .await;
3960
3961        let memory_store = Arc::new(MemoryStore::new());
3962        let client = server
3963            .client_builder()
3964            .on_builder(|builder| {
3965                builder.store_config(
3966                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3967                        .state_store(memory_store.clone()),
3968                )
3969            })
3970            .build()
3971            .await;
3972
3973        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3974
3975        // This subsequent call hits the in-memory cache.
3976        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3977
3978        drop(client);
3979
3980        let client = server
3981            .client_builder()
3982            .on_builder(|builder| {
3983                builder.store_config(
3984                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3985                        .state_store(memory_store.clone()),
3986                )
3987            })
3988            .build()
3989            .await;
3990
3991        // This call to the new client hits the on-disk cache.
3992        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3993
3994        // Then this call hits the in-memory cache.
3995        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3996
3997        drop(well_known_mock);
3998
3999        // Now, reset the cache, and observe the endpoints being called again once.
4000        client.reset_well_known().await.unwrap();
4001
4002        server
4003            .mock_well_known()
4004            .error_unrecognized()
4005            .expect(1)
4006            .named("second well-known mock")
4007            .mount()
4008            .await;
4009
4010        // Hits network again.
4011        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4012        // Hits in-memory cache again.
4013        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4014    }
4015
4016    #[async_test]
4017    async fn test_no_network_doesnt_cause_infinite_retries() {
4018        // We want infinite retries for transient errors.
4019        let client = MockClientBuilder::new(None)
4020            .on_builder(|builder| builder.request_config(RequestConfig::new()))
4021            .build()
4022            .await;
4023
4024        // We don't define a mock server on purpose here, so that the error is really a
4025        // network error.
4026        client.whoami().await.unwrap_err();
4027    }
4028
4029    #[async_test]
4030    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4031        let server = MatrixMockServer::new().await;
4032        let client = server.client_builder().build().await;
4033
4034        let room_id = room_id!("!room:example.org");
4035
4036        server
4037            .mock_sync()
4038            .ok_and_run(&client, |builder| {
4039                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4040            })
4041            .await;
4042
4043        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4044        assert_eq!(room.room_id(), room_id);
4045    }
4046
4047    #[async_test]
4048    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4049        let server = MatrixMockServer::new().await;
4050        let client = server.client_builder().build().await;
4051
4052        let room_id = room_id!("!room:example.org");
4053
4054        let client = Arc::new(client);
4055
4056        // Perform the /sync request with a delay so it starts after the
4057        // `await_room_remote_echo` call has happened
4058        spawn({
4059            let client = client.clone();
4060            async move {
4061                sleep(Duration::from_millis(100)).await;
4062
4063                server
4064                    .mock_sync()
4065                    .ok_and_run(&client, |builder| {
4066                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4067                    })
4068                    .await;
4069            }
4070        });
4071
4072        let room =
4073            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4074        assert_eq!(room.room_id(), room_id);
4075    }
4076
4077    #[async_test]
4078    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4079        let client = MockClientBuilder::new(None).build().await;
4080
4081        let room_id = room_id!("!room:example.org");
4082        // Room is not present so the client won't be able to find it. The call will
4083        // timeout.
4084        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4085    }
4086
4087    #[async_test]
4088    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4089        let server = MatrixMockServer::new().await;
4090        let client = server.client_builder().build().await;
4091
4092        server.mock_create_room().ok().mount().await;
4093
4094        // Create a room in the internal store
4095        let room = client
4096            .create_room(assign!(CreateRoomRequest::new(), {
4097                invite: vec![],
4098                is_direct: false,
4099            }))
4100            .await
4101            .unwrap();
4102
4103        // Room is locally present, but not synced, the call will timeout
4104        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4105            .await
4106            .unwrap_err();
4107    }
4108
4109    #[async_test]
4110    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4111        let server = MatrixMockServer::new().await;
4112        let client = server.client_builder().build().await;
4113
4114        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4115
4116        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4117        assert_matches!(ret, Ok(true));
4118    }
4119
4120    #[async_test]
4121    async fn test_is_room_alias_available_if_alias_is_resolved() {
4122        let server = MatrixMockServer::new().await;
4123        let client = server.client_builder().build().await;
4124
4125        server
4126            .mock_room_directory_resolve_alias()
4127            .ok("!some_room_id:matrix.org", Vec::new())
4128            .expect(1)
4129            .mount()
4130            .await;
4131
4132        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4133        assert_matches!(ret, Ok(false));
4134    }
4135
4136    #[async_test]
4137    async fn test_is_room_alias_available_if_error_found() {
4138        let server = MatrixMockServer::new().await;
4139        let client = server.client_builder().build().await;
4140
4141        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4142
4143        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4144        assert_matches!(ret, Err(_));
4145    }
4146
4147    #[async_test]
4148    async fn test_create_room_alias() {
4149        let server = MatrixMockServer::new().await;
4150        let client = server.client_builder().build().await;
4151
4152        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4153
4154        let ret = client
4155            .create_room_alias(
4156                room_alias_id!("#some_alias:matrix.org"),
4157                room_id!("!some_room:matrix.org"),
4158            )
4159            .await;
4160        assert_matches!(ret, Ok(()));
4161    }
4162
4163    #[async_test]
4164    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4165        let server = MatrixMockServer::new().await;
4166        let client = server.client_builder().build().await;
4167
4168        let room_id = room_id!("!a-room:matrix.org");
4169
4170        // Make sure the summary endpoint is called once
4171        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4172
4173        // We create a locally cached invited room
4174        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4175
4176        // And we get a preview, the server endpoint was reached
4177        let preview = client
4178            .get_room_preview(room_id.into(), Vec::new())
4179            .await
4180            .expect("Room preview should be retrieved");
4181
4182        assert_eq!(invited_room.room_id(), preview.room_id);
4183    }
4184
4185    #[async_test]
4186    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4187        let server = MatrixMockServer::new().await;
4188        let client = server.client_builder().build().await;
4189
4190        let room_id = room_id!("!a-room:matrix.org");
4191
4192        // Make sure the summary endpoint is called once
4193        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4194
4195        // We create a locally cached left room
4196        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4197
4198        // And we get a preview, the server endpoint was reached
4199        let preview = client
4200            .get_room_preview(room_id.into(), Vec::new())
4201            .await
4202            .expect("Room preview should be retrieved");
4203
4204        assert_eq!(left_room.room_id(), preview.room_id);
4205    }
4206
4207    #[async_test]
4208    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4209        let server = MatrixMockServer::new().await;
4210        let client = server.client_builder().build().await;
4211
4212        let room_id = room_id!("!a-room:matrix.org");
4213
4214        // Make sure the summary endpoint is called once
4215        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4216
4217        // We create a locally cached knocked room
4218        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4219
4220        // And we get a preview, the server endpoint was reached
4221        let preview = client
4222            .get_room_preview(room_id.into(), Vec::new())
4223            .await
4224            .expect("Room preview should be retrieved");
4225
4226        assert_eq!(knocked_room.room_id(), preview.room_id);
4227    }
4228
4229    #[async_test]
4230    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4231        let server = MatrixMockServer::new().await;
4232        let client = server.client_builder().build().await;
4233
4234        let room_id = room_id!("!a-room:matrix.org");
4235
4236        // Make sure the summary endpoint is not called
4237        server.mock_room_summary().ok(room_id).never().mount().await;
4238
4239        // We create a locally cached joined room
4240        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4241
4242        // And we get a preview, no server endpoint was reached
4243        let preview = client
4244            .get_room_preview(room_id.into(), Vec::new())
4245            .await
4246            .expect("Room preview should be retrieved");
4247
4248        assert_eq!(joined_room.room_id(), preview.room_id);
4249    }
4250
4251    #[async_test]
4252    async fn test_media_preview_config() {
4253        let server = MatrixMockServer::new().await;
4254        let client = server.client_builder().build().await;
4255
4256        server
4257            .mock_sync()
4258            .ok_and_run(&client, |builder| {
4259                builder.add_custom_global_account_data(json!({
4260                    "content": {
4261                        "media_previews": "private",
4262                        "invite_avatars": "off"
4263                    },
4264                    "type": "m.media_preview_config"
4265                }));
4266            })
4267            .await;
4268
4269        let (initial_value, stream) =
4270            client.account().observe_media_preview_config().await.unwrap();
4271
4272        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4273        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4274        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4275        pin_mut!(stream);
4276        assert_pending!(stream);
4277
4278        server
4279            .mock_sync()
4280            .ok_and_run(&client, |builder| {
4281                builder.add_custom_global_account_data(json!({
4282                    "content": {
4283                        "media_previews": "off",
4284                        "invite_avatars": "on"
4285                    },
4286                    "type": "m.media_preview_config"
4287                }));
4288            })
4289            .await;
4290
4291        assert_next_matches!(
4292            stream,
4293            MediaPreviewConfigEventContent {
4294                media_previews: Some(MediaPreviews::Off),
4295                invite_avatars: Some(InviteAvatars::On),
4296                ..
4297            }
4298        );
4299        assert_pending!(stream);
4300    }
4301
4302    #[async_test]
4303    async fn test_unstable_media_preview_config() {
4304        let server = MatrixMockServer::new().await;
4305        let client = server.client_builder().build().await;
4306
4307        server
4308            .mock_sync()
4309            .ok_and_run(&client, |builder| {
4310                builder.add_custom_global_account_data(json!({
4311                    "content": {
4312                        "media_previews": "private",
4313                        "invite_avatars": "off"
4314                    },
4315                    "type": "io.element.msc4278.media_preview_config"
4316                }));
4317            })
4318            .await;
4319
4320        let (initial_value, stream) =
4321            client.account().observe_media_preview_config().await.unwrap();
4322
4323        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4324        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4325        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4326        pin_mut!(stream);
4327        assert_pending!(stream);
4328
4329        server
4330            .mock_sync()
4331            .ok_and_run(&client, |builder| {
4332                builder.add_custom_global_account_data(json!({
4333                    "content": {
4334                        "media_previews": "off",
4335                        "invite_avatars": "on"
4336                    },
4337                    "type": "io.element.msc4278.media_preview_config"
4338                }));
4339            })
4340            .await;
4341
4342        assert_next_matches!(
4343            stream,
4344            MediaPreviewConfigEventContent {
4345                media_previews: Some(MediaPreviews::Off),
4346                invite_avatars: Some(InviteAvatars::On),
4347                ..
4348            }
4349        );
4350        assert_pending!(stream);
4351    }
4352
4353    #[async_test]
4354    async fn test_media_preview_config_not_found() {
4355        let server = MatrixMockServer::new().await;
4356        let client = server.client_builder().build().await;
4357
4358        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4359
4360        assert!(initial_value.is_none());
4361    }
4362
4363    #[async_test]
4364    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4365        // The default Matrix version we use is 1.11 or higher, so authenticated media
4366        // is supported.
4367        let server = MatrixMockServer::new().await;
4368        let client = server.client_builder().build().await;
4369
4370        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4371
4372        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4373        client.load_or_fetch_max_upload_size().await.unwrap();
4374
4375        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4376    }
4377
4378    #[async_test]
4379    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4380        // The server must advertise support for the stable feature for authenticated
4381        // media support, so we mock the `GET /versions` response.
4382        let server = MatrixMockServer::new().await;
4383        let client = server.client_builder().no_server_versions().build().await;
4384
4385        server
4386            .mock_versions()
4387            .ok_custom(
4388                &["v1.7", "v1.8", "v1.9", "v1.10"],
4389                &[("org.matrix.msc3916.stable", true)].into(),
4390            )
4391            .named("versions")
4392            .expect(1)
4393            .mount()
4394            .await;
4395
4396        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4397
4398        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4399        client.load_or_fetch_max_upload_size().await.unwrap();
4400
4401        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4402    }
4403
4404    #[async_test]
4405    async fn test_load_or_fetch_max_upload_size_no_auth() {
4406        // The server must not support Matrix 1.11 or higher for unauthenticated
4407        // media requests, so we mock the `GET /versions` response.
4408        let server = MatrixMockServer::new().await;
4409        let client = server.client_builder().no_server_versions().build().await;
4410
4411        server
4412            .mock_versions()
4413            .ok_custom(&["v1.1"], &Default::default())
4414            .named("versions")
4415            .expect(1)
4416            .mount()
4417            .await;
4418
4419        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4420
4421        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4422        client.load_or_fetch_max_upload_size().await.unwrap();
4423
4424        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4425    }
4426
4427    #[async_test]
4428    async fn test_uploading_a_too_large_media_file() {
4429        let server = MatrixMockServer::new().await;
4430        let client = server.client_builder().build().await;
4431
4432        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4433        client.load_or_fetch_max_upload_size().await.unwrap();
4434        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4435
4436        let data = vec![1, 2];
4437        let upload_request =
4438            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4439        let request = SendRequest {
4440            client: client.clone(),
4441            request: upload_request,
4442            config: None,
4443            send_progress: SharedObservable::new(TransmissionProgress::default()),
4444        };
4445        let media_request = SendMediaUploadRequest::new(request);
4446
4447        let error = media_request.await.err();
4448        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4449        assert_eq!(max, uint!(1));
4450        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4451    }
4452
4453    #[async_test]
4454    async fn test_dont_ignore_timeout_on_first_sync() {
4455        let server = MatrixMockServer::new().await;
4456        let client = server.client_builder().build().await;
4457
4458        server
4459            .mock_sync()
4460            .timeout(Some(Duration::from_secs(30)))
4461            .ok(|_| {})
4462            .mock_once()
4463            .named("sync_with_timeout")
4464            .mount()
4465            .await;
4466
4467        // Call the endpoint once to check the timeout.
4468        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4469
4470        timeout(Duration::from_secs(1), async {
4471            stream.next().await.unwrap().unwrap();
4472        })
4473        .await
4474        .unwrap();
4475    }
4476
4477    #[async_test]
4478    async fn test_ignore_timeout_on_first_sync() {
4479        let server = MatrixMockServer::new().await;
4480        let client = server.client_builder().build().await;
4481
4482        server
4483            .mock_sync()
4484            .timeout(None)
4485            .ok(|_| {})
4486            .mock_once()
4487            .named("sync_no_timeout")
4488            .mount()
4489            .await;
4490        server
4491            .mock_sync()
4492            .timeout(Some(Duration::from_secs(30)))
4493            .ok(|_| {})
4494            .mock_once()
4495            .named("sync_with_timeout")
4496            .mount()
4497            .await;
4498
4499        // Call each version of the endpoint once to check the timeouts.
4500        let mut stream = Box::pin(
4501            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4502        );
4503
4504        timeout(Duration::from_secs(1), async {
4505            stream.next().await.unwrap().unwrap();
4506            stream.next().await.unwrap().unwrap();
4507        })
4508        .await
4509        .unwrap();
4510    }
4511
4512    #[async_test]
4513    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4514        let server = MatrixMockServer::new().await;
4515        let client = server.client_builder().build().await;
4516        // This is the user ID that is inside MemberAdditional.
4517        // Note the confusing username, so we can share
4518        // GlobalAccountDataTestEvent::Direct with the invited test.
4519        let user_id = user_id!("@invited:localhost");
4520
4521        // When we receive a sync response saying "invited" is invited to a DM
4522        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4523        let response = SyncResponseBuilder::default()
4524            .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4525            .add_global_account_data(
4526                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4527            )
4528            .build_sync_response();
4529        client.base_client().receive_sync_response(response).await.unwrap();
4530
4531        // Then get_dm_room finds this room
4532        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4533        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4534    }
4535
4536    #[async_test]
4537    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4538        let server = MatrixMockServer::new().await;
4539        let client = server.client_builder().build().await;
4540        // This is the user ID that is inside MemberInvite
4541        let user_id = user_id!("@invited:localhost");
4542
4543        // When we receive a sync response saying "invited" is invited to a DM
4544        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4545        let response = SyncResponseBuilder::default()
4546            .add_joined_room(
4547                JoinedRoomBuilder::default()
4548                    .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4549            )
4550            .add_global_account_data(
4551                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4552            )
4553            .build_sync_response();
4554        client.base_client().receive_sync_response(response).await.unwrap();
4555
4556        // Then get_dm_room finds this room
4557        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4558        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4559    }
4560
4561    #[async_test]
4562    async fn test_get_dm_room_still_finds_left_room() {
4563        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4564        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4565
4566        let server = MatrixMockServer::new().await;
4567        let client = server.client_builder().build().await;
4568        // This is the user ID that is inside MemberAdditional.
4569        // Note the confusing username, so we can share
4570        // GlobalAccountDataTestEvent::Direct with the invited test.
4571        let user_id = user_id!("@invited:localhost");
4572
4573        // When we receive a sync response saying "invited" has left a DM
4574        let f = EventFactory::new().sender(user_id);
4575        let response = SyncResponseBuilder::default()
4576            .add_joined_room(
4577                JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4578            )
4579            .add_global_account_data(
4580                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4581            )
4582            .build_sync_response();
4583        client.base_client().receive_sync_response(response).await.unwrap();
4584
4585        // Then get_dm_room finds this room
4586        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4587        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4588    }
4589
4590    #[async_test]
4591    async fn test_device_exists() {
4592        let server = MatrixMockServer::new().await;
4593        let client = server.client_builder().build().await;
4594
4595        server.mock_get_device().ok().expect(1).mount().await;
4596
4597        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4598    }
4599
4600    #[async_test]
4601    async fn test_device_exists_404() {
4602        let server = MatrixMockServer::new().await;
4603        let client = server.client_builder().build().await;
4604
4605        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4606    }
4607
4608    #[async_test]
4609    async fn test_device_exists_500() {
4610        let server = MatrixMockServer::new().await;
4611        let client = server.client_builder().build().await;
4612
4613        server.mock_get_device().error500().expect(1).mount().await;
4614
4615        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4616    }
4617
4618    #[async_test]
4619    async fn test_fetching_well_known_with_homeserver_url() {
4620        let server = MatrixMockServer::new().await;
4621        let client = server.client_builder().build().await;
4622        server.mock_well_known().ok().mount().await;
4623
4624        assert_matches!(client.fetch_client_well_known().await, Some(_));
4625    }
4626
4627    #[async_test]
4628    async fn test_fetching_well_known_with_server_name() {
4629        let server = MatrixMockServer::new().await;
4630        let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4631
4632        server.mock_well_known().ok().mount().await;
4633
4634        let client = MockClientBuilder::new(None)
4635            .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4636            .build()
4637            .await;
4638
4639        assert_matches!(client.fetch_client_well_known().await, Some(_));
4640    }
4641
4642    #[async_test]
4643    async fn test_fetching_well_known_with_domain_part_of_user_id() {
4644        let server = MatrixMockServer::new().await;
4645        server.mock_well_known().ok().mount().await;
4646
4647        let user_id =
4648            UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4649        let client = MockClientBuilder::new(None)
4650            .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4651            .build()
4652            .await;
4653
4654        assert_matches!(client.fetch_client_well_known().await, Some(_));
4655    }
4656}