Skip to main content

matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::{StreamExt, join};
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32    DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35    BaseClient, DmRoomDefinition, RoomInfoNotableUpdate, RoomState, RoomStateFilter,
36    SendOutsideWasm, SessionMeta, StateStoreDataKey, StateStoreDataValue, StoreError,
37    SyncOutsideWasm, ThreadingSupport,
38    event_cache::store::EventCacheStoreLock,
39    media::store::MediaStoreLock,
40    store::{DynStateStore, RoomLoadSettings, SupportedVersionsResponse, WellKnownResponse},
41    sync::{Notification, RoomUpdates},
42    task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl::TtlValue};
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50    api::{
51        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52        client::{
53            account::whoami,
54            alias::{create_alias, delete_alias, get_alias},
55            authenticated_media,
56            device::{self, delete_devices, get_devices, update_device},
57            directory::{get_public_rooms, get_public_rooms_filtered},
58            discovery::{discover_homeserver, get_supported_versions},
59            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
60            knock::knock_room,
61            media,
62            membership::{join_room_by_id, join_room_by_id_or_alias},
63            room::create_room,
64            rtc::RtcTransport,
65            session::login::v3::DiscoveryInfo,
66            sync::sync_events,
67            threads::get_thread_subscriptions_changes,
68            uiaa,
69            user_directory::search_users,
70        },
71        error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
72        path_builder::PathBuilder,
73    },
74    assign,
75    events::{beacon_info::OriginalSyncBeaconInfoEvent, direct::DirectUserIdentifier},
76    push::Ruleset,
77    time::Instant,
78};
79use serde::de::DeserializeOwned;
80use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
81use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
82use url::Url;
83
84use self::{
85    caches::{Cache, CachedValue, ClientCaches},
86    futures::SendRequest,
87};
88use crate::{
89    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
90    Room, SessionTokens, TransmissionProgress,
91    authentication::{
92        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
93        oauth::OAuth,
94    },
95    client::{
96        homeserver_capabilities::HomeserverCapabilities,
97        thread_subscriptions::ThreadSubscriptionCatchup,
98    },
99    config::{RequestConfig, SyncToken},
100    deduplicating_handler::DeduplicatingHandler,
101    error::HttpResult,
102    event_cache::EventCache,
103    event_handler::{
104        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
105        EventHandlerStore, ObservableEventHandler, SyncEvent,
106    },
107    http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
108    latest_events::LatestEvents,
109    live_locations_observer::BeaconInfoUpdate,
110    media::{MediaError, MediaFetcher},
111    notification_settings::NotificationSettings,
112    room::RoomMember,
113    room_preview::RoomPreview,
114    send_queue::{SendQueue, SendQueueData},
115    sliding_sync::Version as SlidingSyncVersion,
116    sync::{RoomUpdate, SyncResponse},
117};
118#[cfg(feature = "e2e-encryption")]
119use crate::{
120    cross_process_lock::CrossProcessLock,
121    encryption::{
122        DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
123        VerificationState,
124    },
125};
126
127mod builder;
128pub(crate) mod caches;
129pub(crate) mod futures;
130pub(crate) mod homeserver_capabilities;
131pub(crate) mod thread_subscriptions;
132
133pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
134#[cfg(feature = "experimental-search")]
135use crate::search_index::SearchIndex;
136
137#[cfg(not(target_family = "wasm"))]
138type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
139#[cfg(target_family = "wasm")]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
141
142#[cfg(not(target_family = "wasm"))]
143type NotificationHandlerFn =
144    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
145#[cfg(target_family = "wasm")]
146type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
147
148/// Enum controlling if a loop running callbacks should continue or abort.
149///
150/// This is mainly used in the [`sync_with_callback`] method, the return value
151/// of the provided callback controls if the sync loop should be exited.
152///
153/// [`sync_with_callback`]: #method.sync_with_callback
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub enum LoopCtrl {
156    /// Continue running the loop.
157    Continue,
158    /// Break out of the loop.
159    Break,
160}
161
162/// Represents changes that can occur to a `Client`s `Session`.
163#[derive(Debug, Clone, PartialEq)]
164pub enum SessionChange {
165    /// The session's token is no longer valid.
166    UnknownToken(UnknownTokenErrorData),
167    /// The session's tokens have been refreshed.
168    TokensRefreshed,
169}
170
171/// Information about the server vendor obtained from the federation API.
172#[derive(Debug, Clone, PartialEq, Eq)]
173#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
174pub struct ServerVendorInfo {
175    /// The server name.
176    pub server_name: String,
177    /// The server version.
178    pub version: String,
179}
180
181/// Information about a map tile server advertised by the homeserver through the
182/// `tile_server` field of the matrix client well-known (MSC3488).
183#[derive(Debug, Clone, PartialEq, Eq, Hash)]
184#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
185pub struct TileServerInfo {
186    /// The URL of a map tile server's `style.json` file. See the
187    /// [Mapbox Style Specification](https://docs.mapbox.com/mapbox-gl-js/style-spec/)
188    /// for more details.
189    pub map_style_url: String,
190}
191
192impl From<discover_homeserver::TileServerInfo> for TileServerInfo {
193    fn from(value: discover_homeserver::TileServerInfo) -> Self {
194        Self { map_style_url: value.map_style_url }
195    }
196}
197
198/// An async/await enabled Matrix client.
199///
200/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
201#[derive(Clone)]
202pub struct Client {
203    pub(crate) inner: Arc<ClientInner>,
204}
205
206#[derive(Default)]
207pub(crate) struct ClientLocks {
208    /// Lock ensuring that only a single room may be marked as a DM at once.
209    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
210    /// explanation.
211    pub(crate) mark_as_dm_lock: Mutex<()>,
212
213    /// Lock ensuring that only a single secret store is getting opened at the
214    /// same time.
215    ///
216    /// This is important so we don't accidentally create multiple different new
217    /// default secret storage keys.
218    #[cfg(feature = "e2e-encryption")]
219    pub(crate) open_secret_store_lock: Mutex<()>,
220
221    /// Lock ensuring that we're only storing a single secret at a time.
222    ///
223    /// Take a look at the [`SecretStore::put_secret`] method for a more
224    /// detailed explanation.
225    ///
226    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
227    #[cfg(feature = "e2e-encryption")]
228    pub(crate) store_secret_lock: Mutex<()>,
229
230    /// Lock ensuring that only one method at a time might modify our backup.
231    #[cfg(feature = "e2e-encryption")]
232    pub(crate) backup_modify_lock: Mutex<()>,
233
234    /// Lock ensuring that we're going to attempt to upload backups for a single
235    /// requester.
236    #[cfg(feature = "e2e-encryption")]
237    pub(crate) backup_upload_lock: Mutex<()>,
238
239    /// Handler making sure we only have one group session sharing request in
240    /// flight per room.
241    #[cfg(feature = "e2e-encryption")]
242    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
243
244    /// Lock making sure we're only doing one key claim request at a time.
245    #[cfg(feature = "e2e-encryption")]
246    pub(crate) key_claim_lock: Mutex<()>,
247
248    /// Handler to ensure that only one members request is running at a time,
249    /// given a room.
250    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
251
252    /// Handler to ensure that only one encryption state request is running at a
253    /// time, given a room.
254    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
255
256    /// Deduplicating handler for sending read receipts. The string is an
257    /// internal implementation detail, see [`Self::send_single_receipt`].
258    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
259
260    #[cfg(feature = "e2e-encryption")]
261    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
262
263    /// Latest "generation" of data known by the crypto store.
264    ///
265    /// This is a counter that only increments, set in the database (and can
266    /// wrap). It's incremented whenever some process acquires a lock for the
267    /// first time. *This assumes the crypto store lock is being held, to
268    /// avoid data races on writing to this value in the store*.
269    ///
270    /// The current process will maintain this value in local memory and in the
271    /// DB over time. Observing a different value than the one read in
272    /// memory, when reading from the store indicates that somebody else has
273    /// written into the database under our feet.
274    ///
275    /// TODO: this should live in the `OlmMachine`, since it's information
276    /// related to the lock. As of today (2023-07-28), we blow up the entire
277    /// olm machine when there's a generation mismatch. So storing the
278    /// generation in the olm machine would make the client think there's
279    /// *always* a mismatch, and that's why we need to store the generation
280    /// outside the `OlmMachine`.
281    #[cfg(feature = "e2e-encryption")]
282    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
283}
284
285pub(crate) struct ClientInner {
286    /// All the data related to authentication and authorization.
287    pub(crate) auth_ctx: Arc<AuthCtx>,
288
289    /// The URL of the server.
290    ///
291    /// Not to be confused with the `Self::homeserver`. `server` is usually
292    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
293    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
294    /// homeserver (at the time of writing — 2024-08-28).
295    ///
296    /// This value is optional depending on how the `Client` has been built.
297    /// If it's been built from a homeserver URL directly, we don't know the
298    /// server. However, if the `Client` has been built from a server URL or
299    /// name, then the homeserver has been discovered, and we know both.
300    server: Option<Url>,
301
302    /// The URL of the homeserver to connect to.
303    ///
304    /// This is the URL for the client-server Matrix API.
305    homeserver: StdRwLock<Url>,
306
307    /// The sliding sync version.
308    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
309
310    /// The underlying HTTP client.
311    pub(crate) http_client: HttpClient,
312
313    /// User session data.
314    pub(super) base_client: BaseClient,
315
316    /// Collection of in-memory caches for the [`Client`].
317    pub(crate) caches: ClientCaches,
318
319    /// Collection of locks individual client methods might want to use, either
320    /// to ensure that only a single call to a method happens at once or to
321    /// deduplicate multiple calls to a method.
322    pub(crate) locks: ClientLocks,
323
324    /// The cross-process lock configuration.
325    ///
326    /// The SDK provides cross-process store locks (see
327    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
328    /// [`CrossProcessLockConfig::MultiProcess`] is used.
329    ///
330    /// If multiple `Client`s are running in different processes, this
331    /// value MUST be different for each `Client`.
332    cross_process_lock_config: CrossProcessLockConfig,
333
334    /// A mapping of the times at which the current user sent typing notices,
335    /// keyed by room.
336    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
337
338    /// Event handlers. See `add_event_handler`.
339    pub(crate) event_handlers: EventHandlerStore,
340
341    /// Notification handlers. See `register_notification_handler`.
342    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
343
344    /// The sender-side of channels used to receive room updates.
345    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
346
347    /// The sender-side of a channel used to observe all the room updates of a
348    /// sync response.
349    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
350
351    /// Whether the client should update its homeserver URL with the discovery
352    /// information present in the login response.
353    respect_login_well_known: bool,
354
355    /// An event that can be listened on to wait for a successful sync. The
356    /// event will only be fired if a sync loop is running. Can be used for
357    /// synchronization, e.g. if we send out a request to create a room, we can
358    /// wait for the sync to get the data to fetch a room object from the state
359    /// store.
360    pub(crate) sync_beat: event_listener::Event,
361
362    /// A central cache for events, inactive first.
363    ///
364    /// It becomes active when [`EventCache::subscribe`] is called.
365    pub(crate) event_cache: OnceCell<EventCache>,
366
367    /// End-to-end encryption related state.
368    #[cfg(feature = "e2e-encryption")]
369    pub(crate) e2ee: EncryptionData,
370
371    /// The verification state of our own device.
372    #[cfg(feature = "e2e-encryption")]
373    pub(crate) verification_state: SharedObservable<VerificationState>,
374
375    /// Whether to enable the experimental support for sending and receiving
376    /// encrypted room history on invite, per [MSC4268].
377    ///
378    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
379    #[cfg(feature = "e2e-encryption")]
380    pub(crate) enable_share_history_on_invite: bool,
381
382    /// Data related to the [`SendQueue`].
383    ///
384    /// [`SendQueue`]: crate::send_queue::SendQueue
385    pub(crate) send_queue_data: Arc<SendQueueData>,
386
387    /// The `max_upload_size` value of the homeserver, it contains the max
388    /// request size you can send.
389    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
390
391    /// The entry point to get the [`LatestEvent`] of rooms and threads.
392    ///
393    /// [`LatestEvent`]: crate::latest_event::LatestEvent
394    latest_events: OnceCell<LatestEvents>,
395
396    /// Service handling the catching up of thread subscriptions in the
397    /// background.
398    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
399
400    #[cfg(feature = "experimental-search")]
401    /// Handler for [`RoomIndex`]'s of each room
402    search_index: SearchIndex,
403
404    /// A monitor for background tasks spawned by the client.
405    pub(crate) task_monitor: TaskMonitor,
406
407    /// A sender to notify subscribers about duplicate key upload errors
408    /// triggered by requests to /keys/upload.
409    #[cfg(feature = "e2e-encryption")]
410    pub(crate) duplicate_key_upload_error_sender:
411        broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
412
413    pub(crate) media_fetcher: Arc<dyn MediaFetcher>,
414}
415
416impl ClientInner {
417    /// Create a new `ClientInner`.
418    ///
419    /// All the fields passed as parameters here are those that must be cloned
420    /// upon instantiation of a sub-client, e.g. a client specialized for
421    /// notifications.
422    #[allow(clippy::too_many_arguments)]
423    async fn new(
424        auth_ctx: Arc<AuthCtx>,
425        server: Option<Url>,
426        homeserver: Url,
427        sliding_sync_version: SlidingSyncVersion,
428        http_client: HttpClient,
429        base_client: BaseClient,
430        supported_versions: CachedValue<TtlValue<SupportedVersions>>,
431        well_known: CachedValue<TtlValue<Option<WellKnownResponse>>>,
432        respect_login_well_known: bool,
433        event_cache: OnceCell<EventCache>,
434        send_queue: Arc<SendQueueData>,
435        latest_events: OnceCell<LatestEvents>,
436        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
437        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
438        cross_process_lock_config: CrossProcessLockConfig,
439        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
440        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
441        media_fetcher: Arc<dyn MediaFetcher>,
442    ) -> Arc<Self> {
443        let caches = ClientCaches {
444            supported_versions: Cache::with_value(supported_versions),
445            well_known: Cache::with_value(well_known),
446            server_metadata: Cache::new(),
447            homeserver_capabilities: Cache::new(),
448        };
449
450        let client = Self {
451            server,
452            homeserver: StdRwLock::new(homeserver),
453            auth_ctx,
454            sliding_sync_version: StdRwLock::new(sliding_sync_version),
455            http_client,
456            base_client,
457            caches,
458            locks: Default::default(),
459            cross_process_lock_config,
460            typing_notice_times: Default::default(),
461            event_handlers: Default::default(),
462            notification_handlers: Default::default(),
463            room_update_channels: Default::default(),
464            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
465            // ballast for all observers to catch up.
466            room_updates_sender: broadcast::Sender::new(32),
467            respect_login_well_known,
468            sync_beat: event_listener::Event::new(),
469            event_cache,
470            send_queue_data: send_queue,
471            latest_events,
472            #[cfg(feature = "e2e-encryption")]
473            e2ee: EncryptionData::new(encryption_settings),
474            #[cfg(feature = "e2e-encryption")]
475            verification_state: SharedObservable::new(VerificationState::Unknown),
476            #[cfg(feature = "e2e-encryption")]
477            enable_share_history_on_invite,
478            server_max_upload_size: Mutex::new(OnceCell::new()),
479            #[cfg(feature = "experimental-search")]
480            search_index: search_index_handler,
481            thread_subscription_catchup,
482            task_monitor: TaskMonitor::new(),
483            #[cfg(feature = "e2e-encryption")]
484            duplicate_key_upload_error_sender: broadcast::channel(1).0,
485            media_fetcher,
486        };
487
488        #[allow(clippy::let_and_return)]
489        let client = Arc::new(client);
490
491        #[cfg(feature = "e2e-encryption")]
492        client.e2ee.initialize_tasks(&client);
493
494        let init_event_cache = client.event_cache.get_or_init(|| async {
495            EventCache::new(&client, client.base_client.event_cache_store().clone())
496        });
497
498        let init_thread_subscription_catchup = client
499            .thread_subscription_catchup
500            .get_or_init(|| ThreadSubscriptionCatchup::new(Client { inner: client.clone() }));
501
502        let _ = join!(init_event_cache, init_thread_subscription_catchup);
503
504        client
505    }
506}
507
508#[cfg(not(tarpaulin_include))]
509impl Debug for Client {
510    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
511        write!(fmt, "Client")
512    }
513}
514
515impl Client {
516    /// Create a new [`Client`] that will use the given homeserver.
517    ///
518    /// # Arguments
519    ///
520    /// * `homeserver_url` - The homeserver that the client should connect to.
521    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
522        Self::builder().homeserver_url(homeserver_url).build().await
523    }
524
525    /// Returns a subscriber that publishes an event every time the ignore user
526    /// list changes.
527    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
528        self.inner.base_client.subscribe_to_ignore_user_list_changes()
529    }
530
531    /// Create a new [`ClientBuilder`].
532    pub fn builder() -> ClientBuilder {
533        ClientBuilder::new()
534    }
535
536    pub(crate) fn base_client(&self) -> &BaseClient {
537        &self.inner.base_client
538    }
539
540    /// The underlying HTTP client.
541    pub fn http_client(&self) -> &reqwest::Client {
542        &self.inner.http_client.inner
543    }
544
545    pub(crate) fn locks(&self) -> &ClientLocks {
546        &self.inner.locks
547    }
548
549    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
550        &self.inner.auth_ctx
551    }
552
553    /// The cross-process store lock configuration used by this [`Client`].
554    ///
555    /// The SDK provides cross-process store locks (see
556    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
557    /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
558    /// the value used for all cross-process store locks used by this
559    /// `Client`.
560    pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
561        &self.inner.cross_process_lock_config
562    }
563
564    /// Change the homeserver URL used by this client.
565    ///
566    /// # Arguments
567    ///
568    /// * `homeserver_url` - The new URL to use.
569    fn set_homeserver(&self, homeserver_url: Url) {
570        *self.inner.homeserver.write().unwrap() = homeserver_url;
571    }
572
573    /// Change to a different homeserver and re-resolve well-known.
574    #[cfg(feature = "e2e-encryption")]
575    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
576        &self,
577        homeserver_url: Url,
578    ) -> Result<()> {
579        self.set_homeserver(homeserver_url);
580        self.reset_well_known().await?;
581        if let Some(well_known) = self.well_known().await {
582            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
583        }
584        Ok(())
585    }
586
587    /// Retrieves a helper component to access the [`HomeserverCapabilities`]
588    /// supported or disabled by the homeserver.
589    pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
590        HomeserverCapabilities::new(self.clone())
591    }
592
593    /// Get the server vendor information from the federation API.
594    ///
595    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
596    /// both the server's software name and version.
597    ///
598    /// # Examples
599    ///
600    /// ```no_run
601    /// # use matrix_sdk::Client;
602    /// # use url::Url;
603    /// # async {
604    /// # let homeserver = Url::parse("http://example.com")?;
605    /// let client = Client::new(homeserver).await?;
606    ///
607    /// let server_info = client.server_vendor_info(None).await?;
608    /// println!(
609    ///     "Server: {}, Version: {}",
610    ///     server_info.server_name, server_info.version
611    /// );
612    /// # anyhow::Ok(()) };
613    /// ```
614    #[cfg(feature = "federation-api")]
615    pub async fn server_vendor_info(
616        &self,
617        request_config: Option<RequestConfig>,
618    ) -> HttpResult<ServerVendorInfo> {
619        use ruma::api::federation::discovery::get_server_version;
620
621        let res = self
622            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
623            .await?;
624
625        // Extract server info, using defaults if fields are missing.
626        let server = res.server.unwrap_or_default();
627        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
628        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
629
630        Ok(ServerVendorInfo { server_name: server_name_str, version })
631    }
632
633    /// Get a copy of the default request config.
634    ///
635    /// The default request config is what's used when sending requests if no
636    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
637    /// function with such a parameter.
638    ///
639    /// If the default request config was not customized through
640    /// [`ClientBuilder`] when creating this `Client`, the returned value will
641    /// be equivalent to [`RequestConfig::default()`].
642    pub fn request_config(&self) -> RequestConfig {
643        self.inner.http_client.request_config
644    }
645
646    /// Check whether the client has been activated.
647    ///
648    /// A client is considered active when:
649    ///
650    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
651    ///    is logged in,
652    /// 2. Has loaded cached data from storage,
653    /// 3. If encryption is enabled, it also initialized or restored its
654    ///    `OlmMachine`.
655    pub fn is_active(&self) -> bool {
656        self.inner.base_client.is_active()
657    }
658
659    /// The server used by the client.
660    ///
661    /// See `Self::server` to learn more.
662    pub fn server(&self) -> Option<&Url> {
663        self.inner.server.as_ref()
664    }
665
666    /// The homeserver of the client.
667    pub fn homeserver(&self) -> Url {
668        self.inner.homeserver.read().unwrap().clone()
669    }
670
671    /// Get the sliding sync version.
672    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
673        self.inner.sliding_sync_version.read().unwrap().clone()
674    }
675
676    /// Override the sliding sync version.
677    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
678        let mut lock = self.inner.sliding_sync_version.write().unwrap();
679        *lock = version;
680    }
681
682    /// Get the Matrix user session meta information.
683    ///
684    /// If the client is currently logged in, this will return a
685    /// [`SessionMeta`] object which contains the user ID and device ID.
686    /// Otherwise it returns `None`.
687    pub fn session_meta(&self) -> Option<&SessionMeta> {
688        self.base_client().session_meta()
689    }
690
691    /// Returns a receiver that gets events for each room info update. To watch
692    /// for new events, use `receiver.resubscribe()`.
693    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
694        self.base_client().room_info_notable_update_receiver()
695    }
696
697    /// Performs a search for users.
698    /// The search is performed case-insensitively on user IDs and display names
699    ///
700    /// # Arguments
701    ///
702    /// * `search_term` - The search term for the search
703    /// * `limit` - The maximum number of results to return. Defaults to 10.
704    ///
705    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
706    pub async fn search_users(
707        &self,
708        search_term: &str,
709        limit: u64,
710    ) -> HttpResult<search_users::v3::Response> {
711        let mut request = search_users::v3::Request::new(search_term.to_owned());
712
713        if let Some(limit) = UInt::new(limit) {
714            request.limit = limit;
715        }
716
717        self.send(request).await
718    }
719
720    /// Get the user id of the current owner of the client.
721    pub fn user_id(&self) -> Option<&UserId> {
722        self.session_meta().map(|s| s.user_id.as_ref())
723    }
724
725    /// Get the device ID that identifies the current session.
726    pub fn device_id(&self) -> Option<&DeviceId> {
727        self.session_meta().map(|s| s.device_id.as_ref())
728    }
729
730    /// Get the current access token for this session.
731    ///
732    /// Will be `None` if the client has not been logged in.
733    pub fn access_token(&self) -> Option<String> {
734        self.auth_ctx().access_token()
735    }
736
737    /// Get the current tokens for this session.
738    ///
739    /// To be notified of changes in the session tokens, use
740    /// [`Client::subscribe_to_session_changes()`] or
741    /// [`Client::set_session_callbacks()`].
742    ///
743    /// Returns `None` if the client has not been logged in.
744    pub fn session_tokens(&self) -> Option<SessionTokens> {
745        self.auth_ctx().session_tokens()
746    }
747
748    /// Access the authentication API used to log in this client.
749    ///
750    /// Will be `None` if the client has not been logged in.
751    pub fn auth_api(&self) -> Option<AuthApi> {
752        match self.auth_ctx().auth_data.get()? {
753            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
754            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
755        }
756    }
757
758    /// Get the whole session info of this client.
759    ///
760    /// Will be `None` if the client has not been logged in.
761    ///
762    /// Can be used with [`Client::restore_session`] to restore a previously
763    /// logged-in session.
764    pub fn session(&self) -> Option<AuthSession> {
765        match self.auth_api()? {
766            AuthApi::Matrix(api) => api.session().map(Into::into),
767            AuthApi::OAuth(api) => api.full_session().map(Into::into),
768        }
769    }
770
771    /// Get a reference to the state store.
772    pub fn state_store(&self) -> &DynStateStore {
773        self.base_client().state_store()
774    }
775
776    /// Get a reference to the event cache store.
777    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
778        self.base_client().event_cache_store()
779    }
780
781    /// Get a reference to the media store.
782    pub fn media_store(&self) -> &MediaStoreLock {
783        self.base_client().media_store()
784    }
785
786    /// Access the native Matrix authentication API with this client.
787    pub fn matrix_auth(&self) -> MatrixAuth {
788        MatrixAuth::new(self.clone())
789    }
790
791    /// Get the account of the current owner of the client.
792    pub fn account(&self) -> Account {
793        Account::new(self.clone())
794    }
795
796    /// Get the encryption manager of the client.
797    #[cfg(feature = "e2e-encryption")]
798    pub fn encryption(&self) -> Encryption {
799        Encryption::new(self.clone())
800    }
801
802    /// Get the media manager of the client.
803    pub fn media(&self) -> Media {
804        Media::new(self.clone(), self.inner.media_fetcher.clone())
805    }
806
807    /// Get the pusher manager of the client.
808    pub fn pusher(&self) -> Pusher {
809        Pusher::new(self.clone())
810    }
811
812    /// Access the OAuth 2.0 API of the client.
813    pub fn oauth(&self) -> OAuth {
814        OAuth::new(self.clone())
815    }
816
817    /// Register a handler for a specific event type.
818    ///
819    /// The handler is a function or closure with one or more arguments. The
820    /// first argument is the event itself. All additional arguments are
821    /// "context" arguments: They have to implement [`EventHandlerContext`].
822    /// This trait is named that way because most of the types implementing it
823    /// give additional context about an event: The room it was in, its raw form
824    /// and other similar things. As two exceptions to this,
825    /// [`Client`] and [`EventHandlerHandle`] also implement the
826    /// `EventHandlerContext` trait so you don't have to clone your client
827    /// into the event handler manually and a handler can decide to remove
828    /// itself.
829    ///
830    /// Some context arguments are not universally applicable. A context
831    /// argument that isn't available for the given event type will result in
832    /// the event handler being skipped and an error being logged. The following
833    /// context argument types are only available for a subset of event types:
834    ///
835    /// * [`Room`] is only available for room-specific events, i.e. not for
836    ///   events like global account data events or presence events.
837    ///
838    /// You can provide custom context via
839    /// [`add_event_handler_context`](Client::add_event_handler_context) and
840    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
841    /// into the event handler.
842    ///
843    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
844    ///
845    /// # Examples
846    ///
847    /// ```no_run
848    /// use matrix_sdk::{
849    ///     deserialized_responses::EncryptionInfo,
850    ///     event_handler::Ctx,
851    ///     ruma::{
852    ///         events::{
853    ///             macros::EventContent,
854    ///             push_rules::PushRulesEvent,
855    ///             room::{
856    ///                 message::SyncRoomMessageEvent,
857    ///                 topic::SyncRoomTopicEvent,
858    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
859    ///             },
860    ///         },
861    ///         push::Action,
862    ///         Int, MilliSecondsSinceUnixEpoch,
863    ///     },
864    ///     Client, Room,
865    /// };
866    /// use serde::{Deserialize, Serialize};
867    ///
868    /// # async fn example(client: Client) {
869    /// client.add_event_handler(
870    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
871    ///         // Common usage: Room event plus room and client.
872    ///     },
873    /// );
874    /// client.add_event_handler(
875    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
876    ///         async move {
877    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
878    ///             // unencrypted events and events that were decrypted by the SDK.
879    ///         }
880    ///     },
881    /// );
882    /// client.add_event_handler(
883    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
884    ///         async move {
885    ///             // A `Vec<Action>` parameter allows you to know which push actions
886    ///             // are applicable for an event. For example, an event with
887    ///             // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
888    ///             // should be highlighted in the timeline.
889    ///         }
890    ///     },
891    /// );
892    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
893    ///     // You can omit any or all arguments after the first.
894    /// });
895    ///
896    /// // Registering a temporary event handler:
897    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
898    ///     /* Event handler */
899    /// });
900    /// client.remove_event_handler(handle);
901    ///
902    /// // Registering custom event handler context:
903    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
904    /// struct MyContext {
905    ///     number: usize,
906    /// }
907    /// client.add_event_handler_context(MyContext { number: 5 });
908    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
909    ///     // Use the context
910    /// });
911    ///
912    /// // This will handle membership events in joined rooms. Invites are special, see below.
913    /// client.add_event_handler(
914    ///     |ev: SyncRoomMemberEvent| async move {},
915    /// );
916    ///
917    /// // To handle state events in invited rooms (including invite membership events),
918    /// // `StrippedRoomMemberEvent` should be used.
919    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
920    /// client.add_event_handler(
921    ///     |ev: StrippedRoomMemberEvent| async move {},
922    /// );
923    ///
924    /// // Custom events work exactly the same way, you just need to declare
925    /// // the content struct and use the EventContent derive macro on it.
926    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
927    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
928    /// struct TokenEventContent {
929    ///     token: String,
930    ///     #[serde(rename = "exp")]
931    ///     expires_at: MilliSecondsSinceUnixEpoch,
932    /// }
933    ///
934    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
935    ///     todo!("Display the token");
936    /// });
937    ///
938    /// // Event handler closures can also capture local variables.
939    /// // Make sure they are cheap to clone though, because they will be cloned
940    /// // every time the closure is called.
941    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
942    ///
943    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
944    ///     println!("Calling the handler with identifier {data}");
945    /// });
946    /// # }
947    /// ```
948    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
949    where
950        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
951        H: EventHandler<Ev, Ctx>,
952    {
953        self.add_event_handler_impl(handler, None)
954    }
955
956    /// Register a handler for a specific room, and event type.
957    ///
958    /// This method works the same way as
959    /// [`add_event_handler`][Self::add_event_handler], except that the handler
960    /// will only be called for events in the room with the specified ID. See
961    /// that method for more details on event handler functions.
962    ///
963    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
964    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
965    /// your use case.
966    pub fn add_room_event_handler<Ev, Ctx, H>(
967        &self,
968        room_id: &RoomId,
969        handler: H,
970    ) -> EventHandlerHandle
971    where
972        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
973        H: EventHandler<Ev, Ctx>,
974    {
975        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
976    }
977
978    /// Observe a specific event type.
979    ///
980    /// `Ev` represents the kind of event that will be observed. `Ctx`
981    /// represents the context that will come with the event. It relies on the
982    /// same mechanism as [`Client::add_event_handler`]. The main difference is
983    /// that it returns an [`ObservableEventHandler`] and doesn't require a
984    /// user-defined closure. It is possible to subscribe to the
985    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
986    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
987    /// Ctx)`.
988    ///
989    /// Be careful that only the most recent value can be observed. Subscribers
990    /// are notified when a new value is sent, but there is no guarantee
991    /// that they will see all values.
992    ///
993    /// # Example
994    ///
995    /// Let's see a classical usage:
996    ///
997    /// ```
998    /// use futures_util::StreamExt as _;
999    /// use matrix_sdk::{
1000    ///     Client, Room,
1001    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
1002    /// };
1003    ///
1004    /// # async fn example(client: Client) -> Option<()> {
1005    /// let observer =
1006    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1007    ///
1008    /// let mut subscriber = observer.subscribe();
1009    ///
1010    /// let (event, (room, push_actions)) = subscriber.next().await?;
1011    /// # Some(())
1012    /// # }
1013    /// ```
1014    ///
1015    /// Now let's see how to get several contexts that can be useful for you:
1016    ///
1017    /// ```
1018    /// use matrix_sdk::{
1019    ///     Client, Room,
1020    ///     deserialized_responses::EncryptionInfo,
1021    ///     ruma::{
1022    ///         events::room::{
1023    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1024    ///         },
1025    ///         push::Action,
1026    ///     },
1027    /// };
1028    ///
1029    /// # async fn example(client: Client) {
1030    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1031    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1032    ///
1033    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1034    /// // to distinguish between unencrypted events and events that were decrypted
1035    /// // by the SDK.
1036    /// let _ = client
1037    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1038    ///     );
1039    ///
1040    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1041    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1042    /// // should be highlighted in the timeline.
1043    /// let _ =
1044    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1045    ///
1046    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1047    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1048    /// # }
1049    /// ```
1050    ///
1051    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1052    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1053    where
1054        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1055        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1056    {
1057        self.observe_room_events_impl(None)
1058    }
1059
1060    /// Observe a specific room, and event type.
1061    ///
1062    /// This method works the same way as [`Client::observe_events`], except
1063    /// that the observability will only be applied for events in the room with
1064    /// the specified ID. See that method for more details.
1065    ///
1066    /// Be careful that only the most recent value can be observed. Subscribers
1067    /// are notified when a new value is sent, but there is no guarantee
1068    /// that they will see all values.
1069    pub fn observe_room_events<Ev, Ctx>(
1070        &self,
1071        room_id: &RoomId,
1072    ) -> ObservableEventHandler<(Ev, Ctx)>
1073    where
1074        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1075        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1076    {
1077        self.observe_room_events_impl(Some(room_id.to_owned()))
1078    }
1079
1080    /// Shared implementation for `Client::observe_events` and
1081    /// `Client::observe_room_events`.
1082    fn observe_room_events_impl<Ev, Ctx>(
1083        &self,
1084        room_id: Option<OwnedRoomId>,
1085    ) -> ObservableEventHandler<(Ev, Ctx)>
1086    where
1087        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1088        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1089    {
1090        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1091        // new value.
1092        let shared_observable = SharedObservable::new(None);
1093
1094        ObservableEventHandler::new(
1095            shared_observable.clone(),
1096            self.event_handler_drop_guard(self.add_event_handler_impl(
1097                move |event: Ev, context: Ctx| {
1098                    shared_observable.set(Some((event, context)));
1099
1100                    ready(())
1101                },
1102                room_id,
1103            )),
1104        )
1105    }
1106
1107    /// Subscribe to future `beacon_info` updates for the current user across
1108    /// all rooms.
1109    ///
1110    /// This stream is push-only: it emits only future updates observed during
1111    /// sync processing and does not replay existing state.
1112    pub fn observe_own_beacon_info_updates(
1113        &self,
1114    ) -> Result<impl Stream<Item = BeaconInfoUpdate> + use<>> {
1115        let observer = self.observe_events::<OriginalSyncBeaconInfoEvent, Room>();
1116        let mut stream = observer.subscribe();
1117        let own_user_id = self.user_id().ok_or(Error::AuthenticationRequired)?.to_owned();
1118        Ok(async_stream::stream! {
1119            let _observer = observer;
1120
1121            while let Some((event, room)) = stream.next().await {
1122                if event.state_key != own_user_id {
1123                    continue;
1124                }
1125                yield BeaconInfoUpdate {
1126                    room_id: room.room_id().to_owned(),
1127                    event_id: event.event_id,
1128                    content: event.content,
1129                };
1130            }
1131        })
1132    }
1133
1134    /// Remove the event handler associated with the handle.
1135    ///
1136    /// Note that you **must not** call `remove_event_handler` from the
1137    /// non-async part of an event handler, that is:
1138    ///
1139    /// ```ignore
1140    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1141    ///     // ⚠ this will cause a deadlock ⚠
1142    ///     client.remove_event_handler(handle);
1143    ///
1144    ///     async move {
1145    ///         // removing the event handler here is fine
1146    ///         client.remove_event_handler(handle);
1147    ///     }
1148    /// })
1149    /// ```
1150    ///
1151    /// Note also that handlers that remove themselves will still execute with
1152    /// events received in the same sync cycle.
1153    ///
1154    /// # Arguments
1155    ///
1156    /// `handle` - The [`EventHandlerHandle`] that is returned when
1157    /// registering the event handler with [`Client::add_event_handler`].
1158    ///
1159    /// # Examples
1160    ///
1161    /// ```no_run
1162    /// # use url::Url;
1163    /// # use tokio::sync::mpsc;
1164    /// #
1165    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1166    /// #
1167    /// use matrix_sdk::{
1168    ///     Client, event_handler::EventHandlerHandle,
1169    ///     ruma::events::room::member::SyncRoomMemberEvent,
1170    /// };
1171    /// #
1172    /// # futures_executor::block_on(async {
1173    /// # let client = matrix_sdk::Client::builder()
1174    /// #     .homeserver_url(homeserver)
1175    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1176    /// #     .build()
1177    /// #     .await
1178    /// #     .unwrap();
1179    ///
1180    /// client.add_event_handler(
1181    ///     |ev: SyncRoomMemberEvent,
1182    ///      client: Client,
1183    ///      handle: EventHandlerHandle| async move {
1184    ///         // Common usage: Check arriving Event is the expected one
1185    ///         println!("Expected RoomMemberEvent received!");
1186    ///         client.remove_event_handler(handle);
1187    ///     },
1188    /// );
1189    /// # });
1190    /// ```
1191    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1192        self.inner.event_handlers.remove(handle);
1193    }
1194
1195    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1196    /// the given handle.
1197    ///
1198    /// When the returned value is dropped, the event handler will be removed.
1199    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1200        EventHandlerDropGuard::new(handle, self.clone())
1201    }
1202
1203    /// Add an arbitrary value for use as event handler context.
1204    ///
1205    /// The value can be obtained in an event handler by adding an argument of
1206    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1207    ///
1208    /// If a value of the same type has been added before, it will be
1209    /// overwritten.
1210    ///
1211    /// # Examples
1212    ///
1213    /// ```no_run
1214    /// use matrix_sdk::{
1215    ///     Room, event_handler::Ctx,
1216    ///     ruma::events::room::message::SyncRoomMessageEvent,
1217    /// };
1218    /// # #[derive(Clone)]
1219    /// # struct SomeType;
1220    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1221    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1222    /// # futures_executor::block_on(async {
1223    /// # let client = matrix_sdk::Client::builder()
1224    /// #     .homeserver_url(homeserver)
1225    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1226    /// #     .build()
1227    /// #     .await
1228    /// #     .unwrap();
1229    ///
1230    /// // Handle used to send messages to the UI part of the app
1231    /// let my_gui_handle: SomeType = obtain_gui_handle();
1232    ///
1233    /// client.add_event_handler_context(my_gui_handle.clone());
1234    /// client.add_event_handler(
1235    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1236    ///         async move {
1237    ///             // gui_handle.send(DisplayMessage { message: ev });
1238    ///         }
1239    ///     },
1240    /// );
1241    /// # });
1242    /// ```
1243    pub fn add_event_handler_context<T>(&self, ctx: T)
1244    where
1245        T: Clone + Send + Sync + 'static,
1246    {
1247        self.inner.event_handlers.add_context(ctx);
1248    }
1249
1250    /// Register a handler for a notification.
1251    ///
1252    /// Similar to [`Client::add_event_handler`], but only allows functions
1253    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1254    /// [`Client`] for now.
1255    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1256    where
1257        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1258        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1259    {
1260        self.inner.notification_handlers.write().await.push(Box::new(
1261            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1262        ));
1263
1264        self
1265    }
1266
1267    /// Subscribe to all updates for the room with the given ID.
1268    ///
1269    /// The returned receiver will receive a new message for each sync response
1270    /// that contains updates for that room.
1271    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1272        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1273            btree_map::Entry::Vacant(entry) => {
1274                let (tx, rx) = broadcast::channel(8);
1275                entry.insert(tx);
1276                rx
1277            }
1278            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1279        }
1280    }
1281
1282    /// Subscribe to all updates to all rooms, whenever any has been received in
1283    /// a sync response.
1284    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1285        self.inner.room_updates_sender.subscribe()
1286    }
1287
1288    pub(crate) async fn notification_handlers(
1289        &self,
1290    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1291        self.inner.notification_handlers.read().await
1292    }
1293
1294    /// Get all the rooms the client knows about.
1295    ///
1296    /// This will return the list of joined, invited, and left rooms.
1297    pub fn rooms(&self) -> Vec<Room> {
1298        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1299    }
1300
1301    /// Get all the rooms the client knows about, filtered by room state.
1302    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1303        self.base_client()
1304            .rooms_filtered(filter)
1305            .into_iter()
1306            .map(|room| Room::new(self.clone(), room))
1307            .collect()
1308    }
1309
1310    /// Get a stream of all the rooms, in addition to the existing rooms.
1311    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1312        let (rooms, stream) = self.base_client().rooms_stream();
1313
1314        let map_room = |room| Room::new(self.clone(), room);
1315
1316        (
1317            rooms.into_iter().map(map_room).collect(),
1318            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1319        )
1320    }
1321
1322    /// Returns the joined rooms this client knows about.
1323    pub fn joined_rooms(&self) -> Vec<Room> {
1324        self.rooms_filtered(RoomStateFilter::JOINED)
1325    }
1326
1327    /// Returns the invited rooms this client knows about.
1328    pub fn invited_rooms(&self) -> Vec<Room> {
1329        self.rooms_filtered(RoomStateFilter::INVITED)
1330    }
1331
1332    /// Returns the left rooms this client knows about.
1333    pub fn left_rooms(&self) -> Vec<Room> {
1334        self.rooms_filtered(RoomStateFilter::LEFT)
1335    }
1336
1337    /// Returns the joined space rooms this client knows about.
1338    pub fn joined_space_rooms(&self) -> Vec<Room> {
1339        self.base_client()
1340            .rooms_filtered(RoomStateFilter::JOINED)
1341            .into_iter()
1342            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1343            .collect()
1344    }
1345
1346    /// Get a room with the given room id.
1347    ///
1348    /// # Arguments
1349    ///
1350    /// `room_id` - The unique id of the room that should be fetched.
1351    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1352        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1353    }
1354
1355    /// Gets the preview of a room, whether the current user has joined it or
1356    /// not.
1357    pub async fn get_room_preview(
1358        &self,
1359        room_or_alias_id: &RoomOrAliasId,
1360        via: Vec<OwnedServerName>,
1361    ) -> Result<RoomPreview> {
1362        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1363            Ok(room_id) => room_id.to_owned(),
1364            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1365        };
1366
1367        if let Some(room) = self.get_room(&room_id) {
1368            // The cached data can only be trusted if the room state is joined or
1369            // banned: for invite and knock rooms, no updates will be received
1370            // for the rooms after the invite/knock action took place so we may
1371            // have very out to date data for important fields such as
1372            // `join_rule`. For left rooms, the homeserver should return the latest info.
1373            match room.state() {
1374                RoomState::Joined | RoomState::Banned => {
1375                    return Ok(RoomPreview::from_known_room(&room).await);
1376                }
1377                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1378            }
1379        }
1380
1381        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1382    }
1383
1384    /// Resolve a room alias to a room id and a list of servers which know
1385    /// about it.
1386    ///
1387    /// # Arguments
1388    ///
1389    /// `room_alias` - The room alias to be resolved.
1390    pub async fn resolve_room_alias(
1391        &self,
1392        room_alias: &RoomAliasId,
1393    ) -> HttpResult<get_alias::v3::Response> {
1394        let request = get_alias::v3::Request::new(room_alias.to_owned());
1395        self.send(request).await
1396    }
1397
1398    /// Checks if a room alias is not in use yet.
1399    ///
1400    /// Returns:
1401    /// - `Ok(true)` if the room alias is available.
1402    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1403    ///   status code).
1404    /// - An `Err` otherwise.
1405    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1406        match self.resolve_room_alias(alias).await {
1407            // The room alias was resolved, so it's already in use.
1408            Ok(_) => Ok(false),
1409            Err(error) => {
1410                match error.client_api_error_kind() {
1411                    // The room alias wasn't found, so it's available.
1412                    Some(ErrorKind::NotFound) => Ok(true),
1413                    _ => Err(error),
1414                }
1415            }
1416        }
1417    }
1418
1419    /// Adds a new room alias associated with a room to the room directory.
1420    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1421        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1422        self.send(request).await?;
1423        Ok(())
1424    }
1425
1426    /// Removes a room alias from the room directory.
1427    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1428        let request = delete_alias::v3::Request::new(alias.to_owned());
1429        self.send(request).await?;
1430        Ok(())
1431    }
1432
1433    /// Update the homeserver from the login response well-known if needed.
1434    ///
1435    /// # Arguments
1436    ///
1437    /// * `login_well_known` - The `well_known` field from a successful login
1438    ///   response.
1439    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1440        if self.inner.respect_login_well_known
1441            && let Some(well_known) = login_well_known
1442            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1443        {
1444            self.set_homeserver(homeserver);
1445        }
1446    }
1447
1448    /// Similar to [`Client::restore_session_with`], with
1449    /// [`RoomLoadSettings::default()`].
1450    ///
1451    /// # Panics
1452    ///
1453    /// Panics if a session was already restored or logged in.
1454    #[instrument(skip_all)]
1455    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1456        self.restore_session_with(session, RoomLoadSettings::default()).await
1457    }
1458
1459    /// Restore a session previously logged-in using one of the available
1460    /// authentication APIs. The number of rooms to restore is controlled by
1461    /// [`RoomLoadSettings`].
1462    ///
1463    /// See the documentation of the corresponding authentication API's
1464    /// `restore_session` method for more information.
1465    ///
1466    /// # Panics
1467    ///
1468    /// Panics if a session was already restored or logged in.
1469    #[instrument(skip_all)]
1470    pub async fn restore_session_with(
1471        &self,
1472        session: impl Into<AuthSession>,
1473        room_load_settings: RoomLoadSettings,
1474    ) -> Result<()> {
1475        let session = session.into();
1476        match session {
1477            AuthSession::Matrix(session) => {
1478                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1479            }
1480            AuthSession::OAuth(session) => {
1481                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1482            }
1483        }
1484    }
1485
1486    /// Refresh the access token using the authentication API used to log into
1487    /// this session.
1488    ///
1489    /// See the documentation of the authentication API's `refresh_access_token`
1490    /// method for more information.
1491    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1492        let Some(auth_api) = self.auth_api() else {
1493            return Err(RefreshTokenError::RefreshTokenRequired);
1494        };
1495
1496        match auth_api {
1497            AuthApi::Matrix(api) => {
1498                trace!("Token refresh: Using the homeserver.");
1499                Box::pin(api.refresh_access_token()).await?;
1500            }
1501            AuthApi::OAuth(api) => {
1502                trace!("Token refresh: Using OAuth 2.0.");
1503                Box::pin(api.refresh_access_token()).await?;
1504            }
1505        }
1506
1507        Ok(())
1508    }
1509
1510    /// Log out the current session using the proper authentication API.
1511    ///
1512    /// # Errors
1513    ///
1514    /// Returns an error if the session is not authenticated or if an error
1515    /// occurred while making the request to the server.
1516    pub async fn logout(&self) -> Result<(), Error> {
1517        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1518        match auth_api {
1519            AuthApi::Matrix(matrix_auth) => {
1520                matrix_auth.logout().await?;
1521                Ok(())
1522            }
1523            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1524        }
1525    }
1526
1527    /// Get or upload a sync filter.
1528    ///
1529    /// This method will either get a filter ID from the store or upload the
1530    /// filter definition to the homeserver and return the new filter ID.
1531    ///
1532    /// # Arguments
1533    ///
1534    /// * `filter_name` - The unique name of the filter, this name will be used
1535    /// locally to store and identify the filter ID returned by the server.
1536    ///
1537    /// * `definition` - The filter definition that should be uploaded to the
1538    /// server if no filter ID can be found in the store.
1539    ///
1540    /// # Examples
1541    ///
1542    /// ```no_run
1543    /// # use matrix_sdk::{
1544    /// #    Client, config::SyncSettings,
1545    /// #    ruma::api::client::{
1546    /// #        filter::{
1547    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1548    /// #        },
1549    /// #        sync::sync_events::v3::Filter,
1550    /// #    }
1551    /// # };
1552    /// # use url::Url;
1553    /// # async {
1554    /// # let homeserver = Url::parse("http://example.com").unwrap();
1555    /// # let client = Client::new(homeserver).await.unwrap();
1556    /// let mut filter = FilterDefinition::default();
1557    ///
1558    /// // Let's enable member lazy loading.
1559    /// filter.room.state.lazy_load_options =
1560    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1561    ///
1562    /// let filter_id = client
1563    ///     .get_or_upload_filter("sync", filter)
1564    ///     .await
1565    ///     .unwrap();
1566    ///
1567    /// let sync_settings = SyncSettings::new()
1568    ///     .filter(Filter::FilterId(filter_id));
1569    ///
1570    /// let response = client.sync_once(sync_settings).await.unwrap();
1571    /// # };
1572    #[instrument(skip(self, definition))]
1573    pub async fn get_or_upload_filter(
1574        &self,
1575        filter_name: &str,
1576        definition: FilterDefinition,
1577    ) -> Result<String> {
1578        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1579            debug!("Found filter locally");
1580            Ok(filter)
1581        } else {
1582            debug!("Didn't find filter locally");
1583            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1584            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1585            let response = self.send(request).await?;
1586
1587            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1588
1589            Ok(response.filter_id)
1590        }
1591    }
1592
1593    /// Prepare to join a room by ID, by getting the current details about it
1594    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1595        let room = self.get_room(room_id)?;
1596
1597        let inviter = match room.invite_details().await {
1598            Ok(details) => details.inviter,
1599            Err(Error::WrongRoomState(_)) => None,
1600            Err(e) => {
1601                warn!("Error fetching invite details for room: {e:?}");
1602                None
1603            }
1604        };
1605
1606        Some(PreJoinRoomInfo { inviter })
1607    }
1608
1609    /// Finish joining a room.
1610    ///
1611    /// If the room was an invite that should be marked as a DM, will include it
1612    /// in the DM event after creating the joined room.
1613    ///
1614    /// If encrypted history sharing is enabled, will check to see if we have a
1615    /// key bundle, and import it if so.
1616    ///
1617    /// # Arguments
1618    ///
1619    /// * `room_id` - The `RoomId` of the room that was joined.
1620    /// * `pre_join_room_info` - Information about the room before we joined.
1621    async fn finish_join_room(
1622        &self,
1623        room_id: &RoomId,
1624        pre_join_room_info: Option<PreJoinRoomInfo>,
1625    ) -> Result<Room> {
1626        info!(?room_id, ?pre_join_room_info, "Completing room join");
1627        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1628            room.state() == RoomState::Invited
1629                && room.is_direct().await.unwrap_or_else(|e| {
1630                    warn!(%room_id, "is_direct() failed: {e}");
1631                    false
1632                })
1633        } else {
1634            false
1635        };
1636
1637        let base_room = self
1638            .base_client()
1639            .room_joined(
1640                room_id,
1641                pre_join_room_info
1642                    .as_ref()
1643                    .and_then(|info| info.inviter.as_ref())
1644                    .map(|i| i.user_id().to_owned()),
1645            )
1646            .await?;
1647        let room = Room::new(self.clone(), base_room);
1648
1649        if mark_as_dm {
1650            room.set_is_direct(true).await?;
1651        }
1652
1653        // If we joined following an invite, check if we had previously received a key
1654        // bundle from the inviter, and import it if so.
1655        //
1656        // It's important that we only do this once `BaseClient::room_joined` has
1657        // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1658        // race.
1659        #[cfg(feature = "e2e-encryption")]
1660        if self.inner.enable_share_history_on_invite
1661            && let Some(inviter) =
1662                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1663        {
1664            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1665                .await?;
1666        }
1667
1668        // Suppress "unused variable" and "unused field" lints
1669        #[cfg(not(feature = "e2e-encryption"))]
1670        let _ = pre_join_room_info.map(|i| i.inviter);
1671
1672        Ok(room)
1673    }
1674
1675    /// Join a room by `RoomId`.
1676    ///
1677    /// Returns the `Room` in the joined state.
1678    ///
1679    /// # Arguments
1680    ///
1681    /// * `room_id` - The `RoomId` of the room to be joined.
1682    #[instrument(skip(self))]
1683    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1684        // See who invited us to this room, if anyone. Note we have to do this before
1685        // making the `/join` request, otherwise we could race against the sync.
1686        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1687
1688        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1689        let response = self.send(request).await?;
1690        self.finish_join_room(&response.room_id, pre_join_info).await
1691    }
1692
1693    /// Join a room by `RoomOrAliasId`.
1694    ///
1695    /// Returns the `Room` in the joined state.
1696    ///
1697    /// # Arguments
1698    ///
1699    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1700    ///   alias looks like `#name:example.com`.
1701    /// * `server_names` - The server names to be used for resolving the alias,
1702    ///   if needs be.
1703    #[instrument(skip(self))]
1704    pub async fn join_room_by_id_or_alias(
1705        &self,
1706        alias: &RoomOrAliasId,
1707        server_names: &[OwnedServerName],
1708    ) -> Result<Room> {
1709        let room_id = match <&RoomId>::try_from(alias) {
1710            Ok(room_id) => room_id,
1711            Err(room_alias) => &self.resolve_room_alias(room_alias).await?.room_id,
1712        };
1713        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1714        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1715            via: server_names.to_owned(),
1716        });
1717        let response = self.send(request).await?;
1718        self.finish_join_room(&response.room_id, pre_join_info).await
1719    }
1720
1721    /// Search the homeserver's directory of public rooms.
1722    ///
1723    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1724    /// a `get_public_rooms::Response`.
1725    ///
1726    /// # Arguments
1727    ///
1728    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1729    ///
1730    /// * `since` - Pagination token from a previous request.
1731    ///
1732    /// * `server` - The name of the server, if `None` the requested server is
1733    ///   used.
1734    ///
1735    /// # Examples
1736    /// ```no_run
1737    /// use matrix_sdk::Client;
1738    /// # use url::Url;
1739    /// # let homeserver = Url::parse("http://example.com").unwrap();
1740    /// # let limit = Some(10);
1741    /// # let since = Some("since token");
1742    /// # let server = Some("servername.com".try_into().unwrap());
1743    /// # async {
1744    /// let mut client = Client::new(homeserver).await.unwrap();
1745    ///
1746    /// client.public_rooms(limit, since, server).await;
1747    /// # };
1748    /// ```
1749    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1750    pub async fn public_rooms(
1751        &self,
1752        limit: Option<u32>,
1753        since: Option<&str>,
1754        server: Option<&ServerName>,
1755    ) -> HttpResult<get_public_rooms::v3::Response> {
1756        let limit = limit.map(UInt::from);
1757
1758        let request = assign!(get_public_rooms::v3::Request::new(), {
1759            limit,
1760            since: since.map(ToOwned::to_owned),
1761            server: server.map(ToOwned::to_owned),
1762        });
1763        self.send(request).await
1764    }
1765
1766    /// Create a room with the given parameters.
1767    ///
1768    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1769    /// created room.
1770    ///
1771    /// If you want to create a direct message with one specific user, you can
1772    /// use [`create_dm`][Self::create_dm], which is more convenient than
1773    /// assembling the [`create_room::v3::Request`] yourself.
1774    ///
1775    /// If the `is_direct` field of the request is set to `true` and at least
1776    /// one user is invited, the room will be automatically added to the direct
1777    /// rooms in the account data.
1778    ///
1779    /// # Examples
1780    ///
1781    /// ```no_run
1782    /// use matrix_sdk::{
1783    ///     Client,
1784    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1785    /// };
1786    /// # use url::Url;
1787    /// #
1788    /// # async {
1789    /// # let homeserver = Url::parse("http://example.com").unwrap();
1790    /// let request = CreateRoomRequest::new();
1791    /// let client = Client::new(homeserver).await.unwrap();
1792    /// assert!(client.create_room(request).await.is_ok());
1793    /// # };
1794    /// ```
1795    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1796        let invite = request.invite.clone();
1797        let is_direct_room = request.is_direct;
1798        let response = self.send(request).await?;
1799
1800        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1801
1802        let joined_room = Room::new(self.clone(), base_room);
1803
1804        if is_direct_room
1805            && !invite.is_empty()
1806            && let Err(error) =
1807                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1808        {
1809            // FIXME: Retry in the background
1810            error!("Failed to mark room as DM: {error}");
1811        }
1812
1813        Ok(joined_room)
1814    }
1815
1816    /// Create a DM room.
1817    ///
1818    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1819    /// given user being invited, the room marked `is_direct` and both the
1820    /// creator and invitee getting the default maximum power level.
1821    ///
1822    /// If the `e2e-encryption` feature is enabled, the room will also be
1823    /// encrypted.
1824    ///
1825    /// # Arguments
1826    ///
1827    /// * `user_id` - The ID of the user to create a DM for.
1828    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1829        #[cfg(feature = "e2e-encryption")]
1830        let initial_state = vec![
1831            InitialStateEvent::with_empty_state_key(
1832                RoomEncryptionEventContent::with_recommended_defaults(),
1833            )
1834            .to_raw_any(),
1835        ];
1836
1837        #[cfg(not(feature = "e2e-encryption"))]
1838        let initial_state = vec![];
1839
1840        let request = assign!(create_room::v3::Request::new(), {
1841            invite: vec![user_id.to_owned()],
1842            is_direct: true,
1843            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1844            initial_state,
1845        });
1846
1847        self.create_room(request).await
1848    }
1849
1850    /// Get the first existing DM room with the given user, if any.
1851    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1852        self.get_dm_rooms(user_id).next()
1853    }
1854
1855    /// Get an iterator with the existing DM rooms for the given user.
1856    pub fn get_dm_rooms(&self, user_id: &UserId) -> impl Iterator<Item = Room> {
1857        let rooms = self.joined_rooms();
1858
1859        let dm_definition = &self.base_client().dm_room_definition;
1860
1861        // Find the room we share with the `user_id` and only with `user_id`
1862        let rooms = rooms.into_iter().filter(move |r| {
1863            let targets = r.direct_targets();
1864            let targets_match =
1865                targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id));
1866            match dm_definition {
1867                DmRoomDefinition::MatrixSpec => targets_match,
1868                DmRoomDefinition::TwoMembers => {
1869                    let service_members_count =
1870                        r.service_members().map(|s| s.len()).unwrap_or_default() as u64;
1871                    let active_non_service_members =
1872                        r.active_members_count().saturating_sub(service_members_count);
1873                    targets_match && active_non_service_members <= 2
1874                }
1875            }
1876        });
1877
1878        trace!(?user_id, ?rooms, "Found DM rooms with user");
1879        rooms
1880    }
1881
1882    /// Search the homeserver's directory for public rooms with a filter.
1883    ///
1884    /// # Arguments
1885    ///
1886    /// * `room_search` - The easiest way to create this request is using the
1887    ///   `get_public_rooms_filtered::Request` itself.
1888    ///
1889    /// # Examples
1890    ///
1891    /// ```no_run
1892    /// # use url::Url;
1893    /// # use matrix_sdk::Client;
1894    /// # async {
1895    /// # let homeserver = Url::parse("http://example.com")?;
1896    /// use matrix_sdk::ruma::{
1897    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1898    /// };
1899    /// # let mut client = Client::new(homeserver).await?;
1900    ///
1901    /// let mut filter = Filter::new();
1902    /// filter.generic_search_term = Some("rust".to_owned());
1903    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1904    /// request.filter = filter;
1905    ///
1906    /// let response = client.public_rooms_filtered(request).await?;
1907    ///
1908    /// for room in response.chunk {
1909    ///     println!("Found room {room:?}");
1910    /// }
1911    /// # anyhow::Ok(()) };
1912    /// ```
1913    pub async fn public_rooms_filtered(
1914        &self,
1915        request: get_public_rooms_filtered::v3::Request,
1916    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1917        self.send(request).await
1918    }
1919
1920    /// Send an arbitrary request to the server, without updating client state.
1921    ///
1922    /// **Warning:** Because this method *does not* update the client state, it
1923    /// is important to make sure that you account for this yourself, and
1924    /// use wrapper methods where available.  This method should *only* be
1925    /// used if a wrapper method for the endpoint you'd like to use is not
1926    /// available.
1927    ///
1928    /// # Arguments
1929    ///
1930    /// * `request` - A filled out and valid request for the endpoint to be hit
1931    ///
1932    /// * `timeout` - An optional request timeout setting, this overrides the
1933    ///   default request setting if one was set.
1934    ///
1935    /// # Examples
1936    ///
1937    /// ```no_run
1938    /// # use matrix_sdk::{Client, config::SyncSettings};
1939    /// # use url::Url;
1940    /// # async {
1941    /// # let homeserver = Url::parse("http://localhost:8080")?;
1942    /// # let mut client = Client::new(homeserver).await?;
1943    /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1944    ///
1945    /// // First construct the request you want to make
1946    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1947    /// // for all available Endpoints
1948    /// let user_id = owned_user_id!("@example:localhost");
1949    /// let request = profile::get_profile::v3::Request::new(user_id);
1950    ///
1951    /// // Start the request using Client::send()
1952    /// let response = client.send(request).await?;
1953    ///
1954    /// // Check the corresponding Response struct to find out what types are
1955    /// // returned
1956    /// # anyhow::Ok(()) };
1957    /// ```
1958    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1959    where
1960        Request: OutgoingRequest + Clone + Debug,
1961        Request::Authentication: SupportedAuthScheme,
1962        Request::PathBuilder: SupportedPathBuilder,
1963        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1964        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1965    {
1966        SendRequest {
1967            client: self.clone(),
1968            request,
1969            config: None,
1970            send_progress: Default::default(),
1971        }
1972    }
1973
1974    pub(crate) async fn send_inner<Request>(
1975        &self,
1976        request: Request,
1977        config: Option<RequestConfig>,
1978        send_progress: SharedObservable<TransmissionProgress>,
1979    ) -> HttpResult<Request::IncomingResponse>
1980    where
1981        Request: OutgoingRequest + Debug,
1982        Request::Authentication: SupportedAuthScheme,
1983        Request::PathBuilder: SupportedPathBuilder,
1984        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1985        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1986    {
1987        let homeserver = self.homeserver().to_string();
1988        let access_token = self.access_token();
1989        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1990
1991        let path_builder_input =
1992            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1993
1994        let result = self
1995            .inner
1996            .http_client
1997            .send(
1998                request,
1999                config,
2000                homeserver,
2001                access_token.as_deref(),
2002                path_builder_input,
2003                send_progress,
2004            )
2005            .await;
2006
2007        if let Err(Some(ErrorKind::UnknownToken { .. })) =
2008            result.as_ref().map_err(HttpError::client_api_error_kind)
2009            && let Some(access_token) = &access_token
2010        {
2011            // Mark the access token as expired.
2012            self.auth_ctx().set_access_token_expired(access_token);
2013        }
2014
2015        result
2016    }
2017
2018    fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
2019        _ = self
2020            .inner
2021            .auth_ctx
2022            .session_change_sender
2023            .send(SessionChange::UnknownToken(unknown_token_data.clone()));
2024    }
2025
2026    /// Fetches server versions from network; no caching.
2027    pub async fn fetch_server_versions(
2028        &self,
2029        request_config: Option<RequestConfig>,
2030    ) -> HttpResult<get_supported_versions::Response> {
2031        // Since this was called by the user, try to refresh the access token if
2032        // necessary.
2033        self.fetch_server_versions_inner(false, request_config).await
2034    }
2035
2036    /// Fetches server versions from network; no caching.
2037    ///
2038    /// If the access token is expired and `failsafe` is `false`, this will
2039    /// attempt to refresh the access token, otherwise this will try to make an
2040    /// unauthenticated request instead.
2041    pub(crate) async fn fetch_server_versions_inner(
2042        &self,
2043        failsafe: bool,
2044        request_config: Option<RequestConfig>,
2045    ) -> HttpResult<get_supported_versions::Response> {
2046        if !failsafe {
2047            // `Client::send()` handles refreshing access tokens.
2048            return self
2049                .send(get_supported_versions::Request::new())
2050                .with_request_config(request_config)
2051                .await;
2052        }
2053
2054        let homeserver = self.homeserver().to_string();
2055
2056        // If we have a fresh access token, try with it first.
2057        if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2058            && self.auth_ctx().has_valid_access_token()
2059            && let Some(access_token) = self.access_token()
2060        {
2061            let result = self
2062                .inner
2063                .http_client
2064                .send(
2065                    get_supported_versions::Request::new(),
2066                    request_config,
2067                    homeserver.clone(),
2068                    Some(&access_token),
2069                    (),
2070                    Default::default(),
2071                )
2072                .await;
2073
2074            if let Err(Some(ErrorKind::UnknownToken { .. })) =
2075                result.as_ref().map_err(HttpError::client_api_error_kind)
2076            {
2077                // If the access token is actually expired, mark it as expired and fallback to
2078                // the unauthenticated request below.
2079                self.auth_ctx().set_access_token_expired(&access_token);
2080            } else {
2081                // If the request succeeded or it's an other error, just stop now.
2082                return result;
2083            }
2084        }
2085
2086        // Try without authentication.
2087        self.inner
2088            .http_client
2089            .send(
2090                get_supported_versions::Request::new(),
2091                request_config,
2092                homeserver.clone(),
2093                None,
2094                (),
2095                Default::default(),
2096            )
2097            .await
2098    }
2099
2100    /// Fetches client well_known from network; no caching.
2101    ///
2102    /// 1. If the [`Client::server`] value is available, we use it to fetch the
2103    ///    well-known contents.
2104    /// 2. If it's not, we try extracting the server name from the
2105    ///    [`Client::user_id`] and building the server URL from it.
2106    /// 3. If we couldn't get the well-known contents with either the explicit
2107    ///    server name or the implicit extracted one, we try the homeserver URL
2108    ///    as a last resort.
2109    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2110        let homeserver = self.homeserver();
2111        let scheme = homeserver.scheme();
2112
2113        // Use the server name, either an explicit one or an implicit one taken from
2114        // the user id: sometimes we'll have only the homeserver url available and no
2115        // server name, but the server name can be extracted from the current user id.
2116        let server_url = self
2117            .server()
2118            .map(|server| server.to_string())
2119            // If the server name wasn't available, extract it from the user id and build a URL:
2120            // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2121            // will be the same for the public server url, lacking a better candidate.
2122            .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2123
2124        // If the server name is available, first try using it
2125        let response = if let Some(server_url) = server_url {
2126            // First try using the server name
2127            self.fetch_client_well_known_with_url(server_url).await
2128        } else {
2129            None
2130        };
2131
2132        // If we didn't get a well-known value yet, try with the homeserver url instead:
2133        if response.is_none() {
2134            // Sometimes people configure their well-known directly on the homeserver so use
2135            // this as a fallback when the server name is unknown.
2136            warn!(
2137                "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2138            );
2139            self.fetch_client_well_known_with_url(homeserver.to_string()).await
2140        } else {
2141            response
2142        }
2143    }
2144
2145    async fn fetch_client_well_known_with_url(
2146        &self,
2147        url: String,
2148    ) -> Option<discover_homeserver::Response> {
2149        let well_known = self
2150            .inner
2151            .http_client
2152            .send(
2153                discover_homeserver::Request::new(),
2154                Some(RequestConfig::short_retry()),
2155                url,
2156                None,
2157                (),
2158                Default::default(),
2159            )
2160            .await;
2161
2162        match well_known {
2163            Ok(well_known) => Some(well_known),
2164            Err(http_error) => {
2165                // It is perfectly valid to not have a well-known file.
2166                // Maybe we should check for a specific error code to be sure?
2167                warn!("Failed to fetch client well-known: {http_error}");
2168                None
2169            }
2170        }
2171    }
2172
2173    /// Load supported versions from storage, or fetch them from network and
2174    /// cache them.
2175    ///
2176    /// If `failsafe` is true, this will try to minimize side effects to avoid
2177    /// possible deadlocks.
2178    async fn fetch_supported_versions(
2179        &self,
2180        failsafe: bool,
2181    ) -> HttpResult<SupportedVersionsResponse> {
2182        let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2183        let supported_versions = SupportedVersionsResponse {
2184            versions: server_versions.versions,
2185            unstable_features: server_versions.unstable_features,
2186        };
2187
2188        Ok(supported_versions)
2189    }
2190
2191    /// Get the Matrix versions and features supported by the homeserver by
2192    /// fetching them from the server or the cache.
2193    ///
2194    /// This is equivalent to calling both [`Client::server_versions()`] and
2195    /// [`Client::unstable_features()`]. To always fetch the result from the
2196    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2197    /// and then `.as_supported_versions()` on the response.
2198    ///
2199    /// # Examples
2200    ///
2201    /// ```no_run
2202    /// use ruma::api::{FeatureFlag, MatrixVersion};
2203    /// # use matrix_sdk::{Client, config::SyncSettings};
2204    /// # use url::Url;
2205    /// # async {
2206    /// # let homeserver = Url::parse("http://localhost:8080")?;
2207    /// # let mut client = Client::new(homeserver).await?;
2208    ///
2209    /// let supported = client.supported_versions().await?;
2210    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2211    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2212    ///
2213    /// let msc_x_feature = FeatureFlag::from("msc_x");
2214    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2215    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2216    /// # anyhow::Ok(()) };
2217    /// ```
2218    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2219        self.supported_versions_inner(false).await
2220    }
2221
2222    /// Get the Matrix versions and features supported by the homeserver by
2223    /// fetching them from the server or the cache.
2224    ///
2225    /// If `failsafe` is true, this will try to minimize side effects to avoid
2226    /// possible deadlocks.
2227    pub(crate) async fn supported_versions_inner(
2228        &self,
2229        failsafe: bool,
2230    ) -> HttpResult<SupportedVersions> {
2231        match self.supported_versions_cached_inner(failsafe).await {
2232            Ok(Some(value)) => {
2233                return Ok(value);
2234            }
2235            Ok(None) => {
2236                // The cache is empty, make a request.
2237            }
2238            Err(error) => {
2239                warn!("error when loading cached supported versions: {error}");
2240                // Fallthrough to make a request.
2241            }
2242        }
2243
2244        self.refresh_supported_versions_cache(failsafe).await
2245    }
2246
2247    /// Refresh the Matrix versions and features supported by the homeserver in
2248    /// the cache.
2249    ///
2250    /// If `failsafe` is true, this will try to minimize side effects to avoid
2251    /// possible deadlocks.
2252    async fn refresh_supported_versions_cache(
2253        &self,
2254        failsafe: bool,
2255    ) -> HttpResult<SupportedVersions> {
2256        let cached_supported_versions = &self.inner.caches.supported_versions;
2257
2258        let mut supported_versions_guard = match cached_supported_versions.refresh_lock.try_lock() {
2259            Ok(guard) => guard,
2260            Err(_) => {
2261                // There is already a refresh in progress, wait for it to finish.
2262                let guard = cached_supported_versions.refresh_lock.lock().await;
2263
2264                if let Err(error) = guard.as_ref() {
2265                    // There was an error in the previous refresh, return it.
2266                    return Err(HttpError::Cached(error.clone()));
2267                }
2268
2269                // Reuse the data if it was cached and it hasn't expired.
2270                if let CachedValue::Cached(value) = cached_supported_versions.value()
2271                    && !value.has_expired()
2272                {
2273                    return Ok(value.into_data());
2274                }
2275
2276                // The data wasn't cached or has expired, we need to make another request.
2277                guard
2278            }
2279        };
2280
2281        let response = match self.fetch_supported_versions(failsafe).await {
2282            Ok(response) => {
2283                *supported_versions_guard = Ok(());
2284                TtlValue::new(response)
2285            }
2286            Err(error) => {
2287                let error = Arc::new(error);
2288                *supported_versions_guard = Err(error.clone());
2289                return Err(HttpError::Cached(error));
2290            }
2291        };
2292
2293        let supported_versions = response.as_ref().map(|response| response.supported_versions());
2294
2295        // Only cache the result if the request was authenticated.
2296        if self.auth_ctx().has_valid_access_token() {
2297            if let Err(err) = self
2298                .state_store()
2299                .set_kv_data(
2300                    StateStoreDataKey::SupportedVersions,
2301                    StateStoreDataValue::SupportedVersions(response),
2302                )
2303                .await
2304            {
2305                warn!("error when caching supported versions: {err}");
2306            }
2307
2308            cached_supported_versions.set_value(supported_versions.clone());
2309        }
2310
2311        Ok(supported_versions.into_data())
2312    }
2313
2314    /// Get the Matrix versions and features supported by the homeserver by
2315    /// fetching them from the cache.
2316    ///
2317    /// For a version of this function that fetches the supported versions and
2318    /// features from the homeserver if the [`SupportedVersions`] aren't
2319    /// found in the cache, take a look at the [`Client::supported_versions()`]
2320    /// method.
2321    ///
2322    /// If the data in the cache has expired, this will trigger a background
2323    /// task to refresh it.
2324    ///
2325    /// # Examples
2326    ///
2327    /// ```no_run
2328    /// use ruma::api::{FeatureFlag, MatrixVersion};
2329    /// # use matrix_sdk::{Client, config::SyncSettings};
2330    /// # use url::Url;
2331    /// # async {
2332    /// # let homeserver = Url::parse("http://localhost:8080")?;
2333    /// # let mut client = Client::new(homeserver).await?;
2334    ///
2335    /// let supported =
2336    ///     if let Some(supported) = client.supported_versions_cached().await? {
2337    ///         supported
2338    ///     } else {
2339    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2340    ///     };
2341    ///
2342    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2343    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2344    ///
2345    /// let msc_x_feature = FeatureFlag::from("msc_x");
2346    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2347    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2348    /// # anyhow::Ok(()) };
2349    /// ```
2350    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2351        self.supported_versions_cached_inner(false).await
2352    }
2353
2354    async fn supported_versions_cached_inner(
2355        &self,
2356        failsafe: bool,
2357    ) -> Result<Option<SupportedVersions>, StoreError> {
2358        let supported_versions_cache = &self.inner.caches.supported_versions;
2359
2360        let value = if let CachedValue::Cached(cached) = supported_versions_cache.value() {
2361            cached
2362        } else if let Some(stored) = self
2363            .state_store()
2364            .get_kv_data(StateStoreDataKey::SupportedVersions)
2365            .await?
2366            .and_then(|value| value.into_supported_versions())
2367        {
2368            let stored = stored.map(|response| response.supported_versions());
2369
2370            // Copy the data from the store in the in-memory cache.
2371            supported_versions_cache.set_value(stored.clone());
2372
2373            stored
2374        } else {
2375            return Ok(None);
2376        };
2377
2378        // Spawn a task to refresh the cache if it has expired and we have a valid
2379        // access token.
2380        if value.has_expired() && self.auth_ctx().has_valid_access_token() {
2381            debug!("spawning task to refresh supported versions cache");
2382
2383            let client = self.clone();
2384            self.task_monitor().spawn_finite_task("refresh supported versions cache", async move {
2385                if let Err(error) = client.refresh_supported_versions_cache(failsafe).await {
2386                    warn!("failed to refresh supported versions cache: {error}");
2387                }
2388            });
2389        }
2390
2391        Ok(Some(value.into_data()))
2392    }
2393
2394    /// Get the Matrix versions supported by the homeserver by fetching them
2395    /// from the server or the cache.
2396    ///
2397    /// # Examples
2398    ///
2399    /// ```no_run
2400    /// use ruma::api::MatrixVersion;
2401    /// # use matrix_sdk::{Client, config::SyncSettings};
2402    /// # use url::Url;
2403    /// # async {
2404    /// # let homeserver = Url::parse("http://localhost:8080")?;
2405    /// # let mut client = Client::new(homeserver).await?;
2406    ///
2407    /// let server_versions = client.server_versions().await?;
2408    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2409    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2410    /// # anyhow::Ok(()) };
2411    /// ```
2412    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2413        Ok(self.supported_versions().await?.versions)
2414    }
2415
2416    /// Get the unstable features supported by the homeserver by fetching them
2417    /// from the server or the cache.
2418    ///
2419    /// # Examples
2420    ///
2421    /// ```no_run
2422    /// use matrix_sdk::ruma::api::FeatureFlag;
2423    /// # use matrix_sdk::{Client, config::SyncSettings};
2424    /// # use url::Url;
2425    /// # async {
2426    /// # let homeserver = Url::parse("http://localhost:8080")?;
2427    /// # let mut client = Client::new(homeserver).await?;
2428    ///
2429    /// let msc_x_feature = FeatureFlag::from("msc_x");
2430    /// let unstable_features = client.unstable_features().await?;
2431    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2432    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2433    /// # anyhow::Ok(()) };
2434    /// ```
2435    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2436        Ok(self.supported_versions().await?.features)
2437    }
2438
2439    /// Empty the supported versions and unstable features cache.
2440    ///
2441    /// Since the SDK caches the supported versions, it's possible to have a
2442    /// stale entry in the cache. This functions makes it possible to force
2443    /// reset it.
2444    pub async fn reset_supported_versions(&self) -> Result<()> {
2445        // Empty the in-memory cache.
2446        self.inner.caches.supported_versions.reset();
2447
2448        // Empty the store cache.
2449        Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2450    }
2451
2452    /// Get the well-known file of the homeserver from the cache.
2453    ///
2454    /// If the data in the cache has expired, this will trigger a background
2455    /// task to refresh it.
2456    async fn well_known_cached(
2457        &self,
2458    ) -> Result<CachedValue<Option<WellKnownResponse>>, StoreError> {
2459        let well_known_cache = &self.inner.caches.well_known;
2460
2461        let value = if let CachedValue::Cached(cached) = well_known_cache.value() {
2462            cached
2463        } else if let Some(stored) = self
2464            .state_store()
2465            .get_kv_data(StateStoreDataKey::WellKnown)
2466            .await?
2467            .and_then(|value| value.into_well_known())
2468        {
2469            // Copy the data from the store into the in-memory cache.
2470            well_known_cache.set_value(stored.clone());
2471
2472            stored
2473        } else {
2474            return Ok(CachedValue::NotSet);
2475        };
2476
2477        // Spawn a task to refresh the cache if it has expired.
2478        if value.has_expired() {
2479            debug!("spawning task to refresh well-known cache");
2480
2481            let client = self.clone();
2482            self.task_monitor().spawn_finite_task("refresh well-known cache", async move {
2483                client.refresh_well_known_cache().await;
2484            });
2485        }
2486
2487        Ok(CachedValue::Cached(value.into_data()))
2488    }
2489
2490    /// Refresh the well-known file of the homeserver in the cache.
2491    async fn refresh_well_known_cache(&self) -> Option<WellKnownResponse> {
2492        let well_known_cache = &self.inner.caches.well_known;
2493
2494        let _well_known_guard = match well_known_cache.refresh_lock.try_lock() {
2495            Ok(guard) => guard,
2496            Err(_) => {
2497                // There is already a refresh in progress, wait for it to finish.
2498                let guard = well_known_cache.refresh_lock.lock().await;
2499
2500                // A refresh can't fail because we ignore failures, so there shouldn't be an
2501                // error in the refresh lock.
2502
2503                // Reuse the data if it was cached and it hasn't expired.
2504                if let CachedValue::Cached(value) = well_known_cache.value()
2505                    && !value.has_expired()
2506                {
2507                    return value.into_data();
2508                }
2509
2510                // The data wasn't cached or has expired, we need to make another request.
2511                guard
2512            }
2513        };
2514
2515        let well_known = TtlValue::new(self.fetch_client_well_known().await.map(Into::into));
2516
2517        if let Err(err) = self
2518            .state_store()
2519            .set_kv_data(
2520                StateStoreDataKey::WellKnown,
2521                StateStoreDataValue::WellKnown(well_known.clone()),
2522            )
2523            .await
2524        {
2525            warn!("error when caching well-known: {err}");
2526        }
2527
2528        well_known_cache.set_value(well_known.clone());
2529
2530        well_known.into_data()
2531    }
2532
2533    /// Get the well-known file of the homeserver by fetching it from the server
2534    /// or the cache.
2535    async fn well_known(&self) -> Option<WellKnownResponse> {
2536        match self.well_known_cached().await {
2537            Ok(CachedValue::Cached(value)) => {
2538                return value;
2539            }
2540            Ok(CachedValue::NotSet) => {
2541                // The cache is empty, make a request.
2542            }
2543            Err(error) => {
2544                warn!("error when loading cached well-known: {error}");
2545                // Fallthrough to make a request.
2546            }
2547        }
2548
2549        self.refresh_well_known_cache().await
2550    }
2551
2552    /// Get information about the homeserver's advertised RTC foci by fetching
2553    /// the well-known file from the server or the cache.
2554    ///
2555    /// # Examples
2556    /// ```no_run
2557    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::rtc::RtcTransport};
2558    /// # use url::Url;
2559    /// # async {
2560    /// # let homeserver = Url::parse("http://localhost:8080")?;
2561    /// # let mut client = Client::new(homeserver).await?;
2562    /// let rtc_foci = client.rtc_foci().await?;
2563    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2564    ///     RtcTransport::LiveKit(info) => Some(info),
2565    ///     _ => None,
2566    /// });
2567    /// if let Some(info) = default_livekit_focus_info {
2568    ///     println!("Default LiveKit service URL: {}", info.service_url);
2569    /// }
2570    /// # anyhow::Ok(()) };
2571    /// ```
2572    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcTransport>> {
2573        let well_known = self.well_known().await;
2574
2575        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2576    }
2577
2578    /// Get information about the homeserver's advertised map tile server, if
2579    /// any, by fetching the well-known file from the server or the cache.
2580    ///
2581    /// Returns `None` if the homeserver has not advertised a tile server in its
2582    /// well-known, or if the well-known is otherwise unavailable.
2583    pub async fn tile_server(&self) -> Option<TileServerInfo> {
2584        self.well_known().await.and_then(|well_known| well_known.tile_server).map(Into::into)
2585    }
2586
2587    /// Empty the well-known cache.
2588    ///
2589    /// Since the SDK caches the well-known, it's possible to have a stale entry
2590    /// in the cache. This functions makes it possible to force reset it.
2591    pub async fn reset_well_known(&self) -> Result<()> {
2592        // Empty the in-memory caches.
2593        self.inner.caches.well_known.reset();
2594
2595        // Empty the store cache.
2596        Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2597    }
2598
2599    /// Check whether MSC 4028 is enabled on the homeserver.
2600    ///
2601    /// # Examples
2602    ///
2603    /// ```no_run
2604    /// # use matrix_sdk::{Client, config::SyncSettings};
2605    /// # use url::Url;
2606    /// # async {
2607    /// # let homeserver = Url::parse("http://localhost:8080")?;
2608    /// # let mut client = Client::new(homeserver).await?;
2609    /// let msc4028_enabled =
2610    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2611    /// # anyhow::Ok(()) };
2612    /// ```
2613    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2614        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2615    }
2616
2617    /// Get information of all our own devices.
2618    ///
2619    /// # Examples
2620    ///
2621    /// ```no_run
2622    /// # use matrix_sdk::{Client, config::SyncSettings};
2623    /// # use url::Url;
2624    /// # async {
2625    /// # let homeserver = Url::parse("http://localhost:8080")?;
2626    /// # let mut client = Client::new(homeserver).await?;
2627    /// let response = client.devices().await?;
2628    ///
2629    /// for device in response.devices {
2630    ///     println!(
2631    ///         "Device: {} {}",
2632    ///         device.device_id,
2633    ///         device.display_name.as_deref().unwrap_or("")
2634    ///     );
2635    /// }
2636    /// # anyhow::Ok(()) };
2637    /// ```
2638    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2639        let request = get_devices::v3::Request::new();
2640
2641        self.send(request).await
2642    }
2643
2644    /// Delete the given devices from the server.
2645    ///
2646    /// # Arguments
2647    ///
2648    /// * `devices` - The list of devices that should be deleted from the
2649    ///   server.
2650    ///
2651    /// * `auth_data` - This request requires user interactive auth, the first
2652    ///   request needs to set this to `None` and will always fail with an
2653    ///   `UiaaResponse`. The response will contain information for the
2654    ///   interactive auth and the same request needs to be made but this time
2655    ///   with some `auth_data` provided.
2656    ///
2657    /// ```no_run
2658    /// # use matrix_sdk::{
2659    /// #    ruma::{api::client::uiaa, owned_device_id},
2660    /// #    Client, Error, config::SyncSettings,
2661    /// # };
2662    /// # use serde_json::json;
2663    /// # use url::Url;
2664    /// # use std::collections::BTreeMap;
2665    /// # async {
2666    /// # let homeserver = Url::parse("http://localhost:8080")?;
2667    /// # let mut client = Client::new(homeserver).await?;
2668    /// let devices = &[owned_device_id!("DEVICEID")];
2669    ///
2670    /// if let Err(e) = client.delete_devices(devices, None).await {
2671    ///     if let Some(info) = e.as_uiaa_response() {
2672    ///         let mut password = uiaa::Password::new(
2673    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2674    ///             "wordpass".to_owned(),
2675    ///         );
2676    ///         password.session = info.session.clone();
2677    ///
2678    ///         client
2679    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2680    ///             .await?;
2681    ///     }
2682    /// }
2683    /// # anyhow::Ok(()) };
2684    pub async fn delete_devices(
2685        &self,
2686        devices: &[OwnedDeviceId],
2687        auth_data: Option<uiaa::AuthData>,
2688    ) -> HttpResult<delete_devices::v3::Response> {
2689        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2690        request.auth = auth_data;
2691
2692        self.send(request).await
2693    }
2694
2695    /// Change the display name of a device owned by the current user.
2696    ///
2697    /// Returns a `update_device::Response` which specifies the result
2698    /// of the operation.
2699    ///
2700    /// # Arguments
2701    ///
2702    /// * `device_id` - The ID of the device to change the display name of.
2703    /// * `display_name` - The new display name to set.
2704    pub async fn rename_device(
2705        &self,
2706        device_id: &DeviceId,
2707        display_name: &str,
2708    ) -> HttpResult<update_device::v3::Response> {
2709        let mut request = update_device::v3::Request::new(device_id.to_owned());
2710        request.display_name = Some(display_name.to_owned());
2711
2712        self.send(request).await
2713    }
2714
2715    /// Check whether a device with a specific ID exists on the server.
2716    ///
2717    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2718    /// with 404 and the underlying error otherwise.
2719    ///
2720    /// # Arguments
2721    ///
2722    /// * `device_id` - The ID of the device to query.
2723    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2724        let request = device::get_device::v3::Request::new(device_id);
2725        match self.send(request).await {
2726            Ok(_) => Ok(true),
2727            Err(err) => {
2728                if let Some(error) = err.as_client_api_error()
2729                    && error.status_code == 404
2730                {
2731                    Ok(false)
2732                } else {
2733                    Err(err.into())
2734                }
2735            }
2736        }
2737    }
2738
2739    /// Synchronize the client's state with the latest state on the server.
2740    ///
2741    /// ## Syncing Events
2742    ///
2743    /// Messages or any other type of event need to be periodically fetched from
2744    /// the server, this is achieved by sending a `/sync` request to the server.
2745    ///
2746    /// The first sync is sent out without a [`token`]. The response of the
2747    /// first sync will contain a [`next_batch`] field which should then be
2748    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2749    /// don't receive the same events multiple times.
2750    ///
2751    /// ## Long Polling
2752    ///
2753    /// A sync should in the usual case always be in flight. The
2754    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2755    /// long the server will wait for new events before it will respond.
2756    /// The server will respond immediately if some new events arrive before the
2757    /// timeout has expired. If no changes arrive and the timeout expires an
2758    /// empty sync response will be sent to the client.
2759    ///
2760    /// This method of sending a request that may not receive a response
2761    /// immediately is called long polling.
2762    ///
2763    /// ## Filtering Events
2764    ///
2765    /// The number or type of messages and events that the client should receive
2766    /// from the server can be altered using a [`Filter`].
2767    ///
2768    /// Filters can be non-trivial and, since they will be sent with every sync
2769    /// request, they may take up a bunch of unnecessary bandwidth.
2770    ///
2771    /// Luckily filters can be uploaded to the server and reused using an unique
2772    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2773    /// method.
2774    ///
2775    /// # Arguments
2776    ///
2777    /// * `sync_settings` - Settings for the sync call, this allows us to set
2778    /// various options to configure the sync:
2779    ///     * [`filter`] - To configure which events we receive and which get
2780    ///       [filtered] by the server
2781    ///     * [`timeout`] - To configure our [long polling] setup.
2782    ///     * [`token`] - To tell the server which events we already received
2783    ///       and where we wish to continue syncing.
2784    ///     * [`full_state`] - To tell the server that we wish to receive all
2785    ///       state events, regardless of our configured [`token`].
2786    ///     * [`set_presence`] - To tell the server to set the presence and to
2787    ///       which state.
2788    ///
2789    /// # Examples
2790    ///
2791    /// ```no_run
2792    /// # use url::Url;
2793    /// # async {
2794    /// # let homeserver = Url::parse("http://localhost:8080")?;
2795    /// # let username = "";
2796    /// # let password = "";
2797    /// use matrix_sdk::{
2798    ///     Client, config::SyncSettings,
2799    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2800    /// };
2801    ///
2802    /// let client = Client::new(homeserver).await?;
2803    /// client.matrix_auth().login_username(username, password).send().await?;
2804    ///
2805    /// // Sync once so we receive the client state and old messages.
2806    /// client.sync_once(SyncSettings::default()).await?;
2807    ///
2808    /// // Register our handler so we start responding once we receive a new
2809    /// // event.
2810    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2811    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2812    /// });
2813    ///
2814    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2815    /// // from our `sync_once()` call automatically.
2816    /// client.sync(SyncSettings::default()).await;
2817    /// # anyhow::Ok(()) };
2818    /// ```
2819    ///
2820    /// [`sync`]: #method.sync
2821    /// [`SyncSettings`]: crate::config::SyncSettings
2822    /// [`token`]: crate::config::SyncSettings#method.token
2823    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2824    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2825    /// [`set_presence`]: ruma::presence::PresenceState
2826    /// [`filter`]: crate::config::SyncSettings#method.filter
2827    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2828    /// [`next_batch`]: SyncResponse#structfield.next_batch
2829    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2830    /// [long polling]: #long-polling
2831    /// [filtered]: #filtering-events
2832    #[instrument(skip(self))]
2833    pub async fn sync_once(
2834        &self,
2835        sync_settings: crate::config::SyncSettings,
2836    ) -> Result<SyncResponse> {
2837        // The sync might not return for quite a while due to the timeout.
2838        // We'll see if there's anything crypto related to send out before we
2839        // sync, i.e. if we closed our client after a sync but before the
2840        // crypto requests were sent out.
2841        //
2842        // This will mostly be a no-op.
2843        #[cfg(feature = "e2e-encryption")]
2844        if let Err(e) = self.send_outgoing_requests().await {
2845            error!(error = ?e, "Error while sending outgoing E2EE requests");
2846        }
2847
2848        let token = match sync_settings.token {
2849            SyncToken::Specific(token) => Some(token),
2850            SyncToken::NoToken => None,
2851            SyncToken::ReusePrevious => self.sync_token().await,
2852        };
2853
2854        let request = assign!(sync_events::v3::Request::new(), {
2855            filter: sync_settings.filter.map(|f| *f),
2856            since: token,
2857            full_state: sync_settings.full_state,
2858            set_presence: sync_settings.set_presence,
2859            timeout: sync_settings.timeout,
2860            use_state_after: true,
2861        });
2862        let mut request_config = self.request_config();
2863        if let Some(timeout) = sync_settings.timeout {
2864            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2865            request_config.timeout = Some(base_timeout + timeout);
2866        }
2867
2868        let response = self.send(request).with_request_config(request_config).await?;
2869        let next_batch = response.next_batch.clone();
2870        let response = self.process_sync(response).await?;
2871
2872        #[cfg(feature = "e2e-encryption")]
2873        if let Err(e) = self.send_outgoing_requests().await {
2874            error!(error = ?e, "Error while sending outgoing E2EE requests");
2875        }
2876
2877        self.inner.sync_beat.notify(usize::MAX);
2878
2879        Ok(SyncResponse::new(next_batch, response))
2880    }
2881
2882    /// Repeatedly synchronize the client state with the server.
2883    ///
2884    /// This method will only return on error, if cancellation is needed
2885    /// the method should be wrapped in a cancelable task or the
2886    /// [`Client::sync_with_callback`] method can be used or
2887    /// [`Client::sync_with_result_callback`] if you want to handle error
2888    /// cases in the loop, too.
2889    ///
2890    /// This method will internally call [`Client::sync_once`] in a loop.
2891    ///
2892    /// This method can be used with the [`Client::add_event_handler`]
2893    /// method to react to individual events. If you instead wish to handle
2894    /// events in a bulk manner the [`Client::sync_with_callback`],
2895    /// [`Client::sync_with_result_callback`] and
2896    /// [`Client::sync_stream`] methods can be used instead. Those methods
2897    /// repeatedly return the whole sync response.
2898    ///
2899    /// # Arguments
2900    ///
2901    /// * `sync_settings` - Settings for the sync call. *Note* that those
2902    ///   settings will be only used for the first sync call. See the argument
2903    ///   docs for [`Client::sync_once`] for more info.
2904    ///
2905    /// # Return
2906    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2907    /// up to the user of the API to check the error and decide whether the sync
2908    /// should continue or not.
2909    ///
2910    /// # Examples
2911    ///
2912    /// ```no_run
2913    /// # use url::Url;
2914    /// # async {
2915    /// # let homeserver = Url::parse("http://localhost:8080")?;
2916    /// # let username = "";
2917    /// # let password = "";
2918    /// use matrix_sdk::{
2919    ///     Client, config::SyncSettings,
2920    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2921    /// };
2922    ///
2923    /// let client = Client::new(homeserver).await?;
2924    /// client.matrix_auth().login_username(&username, &password).send().await?;
2925    ///
2926    /// // Register our handler so we start responding once we receive a new
2927    /// // event.
2928    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2929    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2930    /// });
2931    ///
2932    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2933    /// // automatically.
2934    /// client.sync(SyncSettings::default()).await?;
2935    /// # anyhow::Ok(()) };
2936    /// ```
2937    ///
2938    /// [argument docs]: #method.sync_once
2939    /// [`sync_with_callback`]: #method.sync_with_callback
2940    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2941        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2942    }
2943
2944    /// Repeatedly call sync to synchronize the client state with the server.
2945    ///
2946    /// # Arguments
2947    ///
2948    /// * `sync_settings` - Settings for the sync call. *Note* that those
2949    ///   settings will be only used for the first sync call. See the argument
2950    ///   docs for [`Client::sync_once`] for more info.
2951    ///
2952    /// * `callback` - A callback that will be called every time a successful
2953    ///   response has been fetched from the server. The callback must return a
2954    ///   boolean which signalizes if the method should stop syncing. If the
2955    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2956    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2957    ///
2958    /// # Return
2959    /// The sync runs until an error occurs or the
2960    /// callback indicates that the Loop should stop. If the callback asked for
2961    /// a regular stop, the result will be `Ok(())` otherwise the
2962    /// `Err(Error)` is returned.
2963    ///
2964    /// # Examples
2965    ///
2966    /// The following example demonstrates how to sync forever while sending all
2967    /// the interesting events through a mpsc channel to another thread e.g. a
2968    /// UI thread.
2969    ///
2970    /// ```no_run
2971    /// # use std::time::Duration;
2972    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2973    /// # use url::Url;
2974    /// # async {
2975    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2976    /// # let mut client = Client::new(homeserver).await.unwrap();
2977    ///
2978    /// use tokio::sync::mpsc::channel;
2979    ///
2980    /// let (tx, rx) = channel(100);
2981    ///
2982    /// let sync_channel = &tx;
2983    /// let sync_settings = SyncSettings::new()
2984    ///     .timeout(Duration::from_secs(30));
2985    ///
2986    /// client
2987    ///     .sync_with_callback(sync_settings, |response| async move {
2988    ///         let channel = sync_channel;
2989    ///         for (room_id, room) in response.rooms.joined {
2990    ///             for event in room.timeline.events {
2991    ///                 channel.send(event).await.unwrap();
2992    ///             }
2993    ///         }
2994    ///
2995    ///         LoopCtrl::Continue
2996    ///     })
2997    ///     .await;
2998    /// };
2999    /// ```
3000    #[instrument(skip_all)]
3001    pub async fn sync_with_callback<C>(
3002        &self,
3003        sync_settings: crate::config::SyncSettings,
3004        callback: impl Fn(SyncResponse) -> C,
3005    ) -> Result<(), Error>
3006    where
3007        C: Future<Output = LoopCtrl>,
3008    {
3009        self.sync_with_result_callback(sync_settings, |result| async {
3010            Ok(callback(result?).await)
3011        })
3012        .await
3013    }
3014
3015    /// Repeatedly call sync to synchronize the client state with the server.
3016    ///
3017    /// # Arguments
3018    ///
3019    /// * `sync_settings` - Settings for the sync call. *Note* that those
3020    ///   settings will be only used for the first sync call. See the argument
3021    ///   docs for [`Client::sync_once`] for more info.
3022    ///
3023    /// * `callback` - A callback that will be called every time after a
3024    ///   response has been received, failure or not. The callback returns a
3025    ///   `Result<LoopCtrl, Error>`, too. When returning
3026    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
3027    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
3028    ///   function returns `Ok(())`. In case the callback can't handle the
3029    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
3030    ///   which results in the sync ending and the `Err(Error)` being returned.
3031    ///
3032    /// # Return
3033    /// The sync runs until an error occurs that the callback can't handle or
3034    /// the callback indicates that the Loop should stop. If the callback
3035    /// asked for a regular stop, the result will be `Ok(())` otherwise the
3036    /// `Err(Error)` is returned.
3037    ///
3038    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
3039    /// this, and are handled first without sending the result to the
3040    /// callback. Only after they have exceeded is the `Result` handed to
3041    /// the callback.
3042    ///
3043    /// # Examples
3044    ///
3045    /// The following example demonstrates how to sync forever while sending all
3046    /// the interesting events through a mpsc channel to another thread e.g. a
3047    /// UI thread.
3048    ///
3049    /// ```no_run
3050    /// # use std::time::Duration;
3051    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
3052    /// # use url::Url;
3053    /// # async {
3054    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
3055    /// # let mut client = Client::new(homeserver).await.unwrap();
3056    /// #
3057    /// use tokio::sync::mpsc::channel;
3058    ///
3059    /// let (tx, rx) = channel(100);
3060    ///
3061    /// let sync_channel = &tx;
3062    /// let sync_settings = SyncSettings::new()
3063    ///     .timeout(Duration::from_secs(30));
3064    ///
3065    /// client
3066    ///     .sync_with_result_callback(sync_settings, |response| async move {
3067    ///         let channel = sync_channel;
3068    ///         let sync_response = response?;
3069    ///         for (room_id, room) in sync_response.rooms.joined {
3070    ///              for event in room.timeline.events {
3071    ///                  channel.send(event).await.unwrap();
3072    ///               }
3073    ///         }
3074    ///
3075    ///         Ok(LoopCtrl::Continue)
3076    ///     })
3077    ///     .await;
3078    /// };
3079    /// ```
3080    #[instrument(skip(self, callback))]
3081    pub async fn sync_with_result_callback<C>(
3082        &self,
3083        sync_settings: crate::config::SyncSettings,
3084        callback: impl Fn(Result<SyncResponse, Error>) -> C,
3085    ) -> Result<(), Error>
3086    where
3087        C: Future<Output = Result<LoopCtrl, Error>>,
3088    {
3089        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
3090
3091        while let Some(result) = sync_stream.next().await {
3092            trace!("Running callback");
3093            if callback(result).await? == LoopCtrl::Break {
3094                trace!("Callback told us to stop");
3095                break;
3096            }
3097            trace!("Done running callback");
3098        }
3099
3100        Ok(())
3101    }
3102
3103    //// Repeatedly synchronize the client state with the server.
3104    ///
3105    /// This method will internally call [`Client::sync_once`] in a loop and is
3106    /// equivalent to the [`Client::sync`] method but the responses are provided
3107    /// as an async stream.
3108    ///
3109    /// # Arguments
3110    ///
3111    /// * `sync_settings` - Settings for the sync call. *Note* that those
3112    ///   settings will be only used for the first sync call. See the argument
3113    ///   docs for [`Client::sync_once`] for more info.
3114    ///
3115    /// # Examples
3116    ///
3117    /// ```no_run
3118    /// # use url::Url;
3119    /// # async {
3120    /// # let homeserver = Url::parse("http://localhost:8080")?;
3121    /// # let username = "";
3122    /// # let password = "";
3123    /// use futures_util::StreamExt;
3124    /// use matrix_sdk::{Client, config::SyncSettings};
3125    ///
3126    /// let client = Client::new(homeserver).await?;
3127    /// client.matrix_auth().login_username(&username, &password).send().await?;
3128    ///
3129    /// let mut sync_stream =
3130    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
3131    ///
3132    /// while let Some(Ok(response)) = sync_stream.next().await {
3133    ///     for room in response.rooms.joined.values() {
3134    ///         for e in &room.timeline.events {
3135    ///             if let Ok(event) = e.raw().deserialize() {
3136    ///                 println!("Received event {:?}", event);
3137    ///             }
3138    ///         }
3139    ///     }
3140    /// }
3141    ///
3142    /// # anyhow::Ok(()) };
3143    /// ```
3144    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3145    #[instrument(skip(self))]
3146    pub async fn sync_stream(
3147        &self,
3148        mut sync_settings: crate::config::SyncSettings,
3149    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3150        let mut is_first_sync = true;
3151        let mut timeout = None;
3152        let mut last_sync_time: Option<Instant> = None;
3153
3154        let parent_span = Span::current();
3155
3156        async_stream::stream!({
3157            loop {
3158                trace!("Syncing");
3159
3160                if sync_settings.ignore_timeout_on_first_sync {
3161                    if is_first_sync {
3162                        timeout = sync_settings.timeout.take();
3163                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
3164                        sync_settings.timeout = timeout.take();
3165                    }
3166
3167                    is_first_sync = false;
3168                }
3169
3170                yield self
3171                    .sync_loop_helper(&mut sync_settings)
3172                    .instrument(parent_span.clone())
3173                    .await;
3174
3175                Client::delay_sync(&mut last_sync_time).await
3176            }
3177        })
3178    }
3179
3180    /// Get the current, if any, sync token of the client.
3181    /// This will be None if the client didn't sync at least once.
3182    pub(crate) async fn sync_token(&self) -> Option<String> {
3183        self.inner.base_client.sync_token().await
3184    }
3185
3186    /// Gets information about the owner of a given access token.
3187    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3188        let request = whoami::v3::Request::new();
3189        self.send(request).await
3190    }
3191
3192    /// Subscribes a new receiver to client SessionChange broadcasts.
3193    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3194        let broadcast = &self.auth_ctx().session_change_sender;
3195        broadcast.subscribe()
3196    }
3197
3198    /// Sets the save/restore session callbacks.
3199    ///
3200    /// This is another mechanism to get synchronous updates to session tokens,
3201    /// while [`Self::subscribe_to_session_changes`] provides an async update.
3202    pub fn set_session_callbacks(
3203        &self,
3204        reload_session_callback: Box<ReloadSessionCallback>,
3205        save_session_callback: Box<SaveSessionCallback>,
3206    ) -> Result<()> {
3207        self.inner
3208            .auth_ctx
3209            .reload_session_callback
3210            .set(reload_session_callback)
3211            .map_err(|_| Error::MultipleSessionCallbacks)?;
3212
3213        self.inner
3214            .auth_ctx
3215            .save_session_callback
3216            .set(save_session_callback)
3217            .map_err(|_| Error::MultipleSessionCallbacks)?;
3218
3219        Ok(())
3220    }
3221
3222    /// Get the notification settings of the current owner of the client.
3223    pub async fn notification_settings(&self) -> NotificationSettings {
3224        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3225        NotificationSettings::new(self.clone(), ruleset)
3226    }
3227
3228    /// Create a new specialized `Client` that can process notifications.
3229    ///
3230    /// See [`CrossProcessLock::new`] to learn more about
3231    /// `cross_process_lock_config`.
3232    ///
3233    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3234    pub async fn notification_client(
3235        &self,
3236        cross_process_lock_config: CrossProcessLockConfig,
3237    ) -> Result<Client> {
3238        let client = Client {
3239            inner: ClientInner::new(
3240                self.inner.auth_ctx.clone(),
3241                self.server().cloned(),
3242                self.homeserver(),
3243                self.sliding_sync_version(),
3244                self.inner.http_client.clone(),
3245                self.inner
3246                    .base_client
3247                    .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3248                    .await?,
3249                self.inner.caches.supported_versions.value(),
3250                self.inner.caches.well_known.value(),
3251                self.inner.respect_login_well_known,
3252                self.inner.event_cache.clone(),
3253                self.inner.send_queue_data.clone(),
3254                self.inner.latest_events.clone(),
3255                #[cfg(feature = "e2e-encryption")]
3256                self.inner.e2ee.encryption_settings,
3257                #[cfg(feature = "e2e-encryption")]
3258                self.inner.enable_share_history_on_invite,
3259                cross_process_lock_config,
3260                #[cfg(feature = "experimental-search")]
3261                self.inner.search_index.clone(),
3262                self.inner.thread_subscription_catchup.clone(),
3263                self.inner.media_fetcher.clone(),
3264            )
3265            .await,
3266        };
3267
3268        Ok(client)
3269    }
3270
3271    /// The [`EventCache`] instance for this [`Client`].
3272    pub fn event_cache(&self) -> &EventCache {
3273        // SAFETY: always initialized in the `Client` ctor.
3274        self.inner.event_cache.get().unwrap()
3275    }
3276
3277    /// The [`LatestEvents`] instance for this [`Client`].
3278    pub async fn latest_events(&self) -> &LatestEvents {
3279        self.inner
3280            .latest_events
3281            .get_or_init(|| async {
3282                LatestEvents::new(
3283                    WeakClient::from_client(self),
3284                    self.event_cache().clone(),
3285                    SendQueue::new(self.clone()),
3286                    self.room_info_notable_update_receiver(),
3287                )
3288            })
3289            .await
3290    }
3291
3292    /// Waits until an at least partially synced room is received, and returns
3293    /// it.
3294    ///
3295    /// **Note: this function will loop endlessly until either it finds the room
3296    /// or an externally set timeout happens.**
3297    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3298        loop {
3299            if let Some(room) = self.get_room(room_id) {
3300                if room.is_state_partially_or_fully_synced() {
3301                    debug!("Found just created room!");
3302                    return room;
3303                }
3304                debug!("Room wasn't partially synced, waiting for sync beat to try again");
3305            } else {
3306                debug!("Room wasn't found, waiting for sync beat to try again");
3307            }
3308            self.inner.sync_beat.listen().await;
3309        }
3310    }
3311
3312    /// Knock on a room given its `room_id_or_alias` to ask for permission to
3313    /// join it.
3314    pub async fn knock(
3315        &self,
3316        room_id_or_alias: OwnedRoomOrAliasId,
3317        reason: Option<String>,
3318        server_names: Vec<OwnedServerName>,
3319    ) -> Result<Room> {
3320        let request =
3321            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3322        let response = self.send(request).await?;
3323        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3324        Ok(Room::new(self.clone(), base_room))
3325    }
3326
3327    /// Checks whether the provided `user_id` belongs to an ignored user.
3328    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3329        self.base_client().is_user_ignored(user_id).await
3330    }
3331
3332    /// Gets the `max_upload_size` value from the homeserver, getting either a
3333    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3334    /// missing.
3335    ///
3336    /// Check the spec for more info:
3337    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3338    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3339        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3340        if let Some(data) = max_upload_size_lock.get() {
3341            return Ok(data.to_owned());
3342        }
3343
3344        // Use the authenticated endpoint when the server supports it.
3345        let supported_versions = self.supported_versions().await?;
3346        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3347            .is_supported(&supported_versions);
3348
3349        let upload_size = if use_auth {
3350            self.send(authenticated_media::get_media_config::v1::Request::default())
3351                .await?
3352                .upload_size
3353        } else {
3354            #[allow(deprecated)]
3355            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3356        };
3357
3358        match max_upload_size_lock.set(upload_size) {
3359            Ok(_) => Ok(upload_size),
3360            Err(error) => {
3361                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3362            }
3363        }
3364    }
3365
3366    /// The settings to use for decrypting events.
3367    #[cfg(feature = "e2e-encryption")]
3368    pub fn decryption_settings(&self) -> &DecryptionSettings {
3369        &self.base_client().decryption_settings
3370    }
3371
3372    /// Returns the [`SearchIndex`] for this [`Client`].
3373    #[cfg(feature = "experimental-search")]
3374    pub fn search_index(&self) -> &SearchIndex {
3375        &self.inner.search_index
3376    }
3377
3378    /// Whether the client is configured to take thread subscriptions (MSC4306
3379    /// and MSC4308) into account, and the server enabled the experimental
3380    /// feature flag for it.
3381    ///
3382    /// This may cause filtering out of thread subscriptions, and loading the
3383    /// thread subscriptions via the sliding sync extension, when the room
3384    /// list service is being used.
3385    ///
3386    /// This is async and fallible as it may use the network to retrieve the
3387    /// server supported features, if they aren't cached already.
3388    pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3389        // Check if the client is configured to support thread subscriptions first.
3390        match self.base_client().threading_support {
3391            ThreadingSupport::Enabled { with_subscriptions: false }
3392            | ThreadingSupport::Disabled => return Ok(false),
3393            ThreadingSupport::Enabled { with_subscriptions: true } => {}
3394        }
3395
3396        // Now, let's check that the server supports it.
3397        let server_enabled = self
3398            .supported_versions()
3399            .await?
3400            .features
3401            .contains(&FeatureFlag::from("org.matrix.msc4306"));
3402
3403        Ok(server_enabled)
3404    }
3405
3406    /// Fetch thread subscriptions changes between `from` and up to `to`.
3407    ///
3408    /// The `limit` optional parameter can be used to limit the number of
3409    /// entries in a response. It can also be overridden by the server, if
3410    /// it's deemed too large.
3411    pub async fn fetch_thread_subscriptions(
3412        &self,
3413        from: Option<String>,
3414        to: Option<String>,
3415        limit: Option<UInt>,
3416    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3417        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3418            from,
3419            to,
3420            limit,
3421        });
3422        Ok(self.send(request).await?)
3423    }
3424
3425    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3426        self.inner.thread_subscription_catchup.get().unwrap()
3427    }
3428
3429    /// Pause the client for background suspension.
3430    ///
3431    /// This method:
3432    /// 1. Disables all send queues (prevents new message sends).
3433    /// 2. Pauses all database stores, waiting for in-flight operations and
3434    ///    releasing all connections and file locks.
3435    ///
3436    /// Call [`Client::resume()`] when the app returns to the foreground.
3437    ///
3438    /// # iOS
3439    ///
3440    /// Call this before the app is suspended to avoid `0xdead10cc` kills.
3441    /// Typically called from
3442    /// [`applicationDidEnterBackground`](https://developer.apple.com/documentation/uikit/uiapplicationdelegate/applicationdidenterbackground(_:))
3443    /// or an equivalent SwiftUI lifecycle event, *after* stopping the
3444    /// `matrix_sdk_ui::sync_service::SyncService`.
3445    pub async fn pause(&self) -> Result<()> {
3446        info!("Client::pause — releasing database resources");
3447
3448        // Disable send queues so no new sends hit the stores.
3449        self.send_queue().set_enabled(false).await;
3450
3451        // Close all stores (waits for in-flight ops, closes connections).
3452        self.base_client().close_stores().await?;
3453
3454        info!("Client::pause — complete, all database connections released");
3455        Ok(())
3456    }
3457
3458    /// Resume the client after a [`Client::pause()`].
3459    ///
3460    /// Re-acquires store resources and re-enables send queues.
3461    ///
3462    /// If your app stopped the `matrix_sdk_ui::sync_service::SyncService`
3463    /// before pausing, restart it separately as appropriate for your app
3464    /// lifecycle.
3465    pub async fn resume(&self) -> Result<()> {
3466        info!("Client::resume — re-acquiring database resources");
3467
3468        // Reopen stores (creates new connection pools).
3469        self.base_client().reopen_stores().await?;
3470
3471        // Re-enable send queues.
3472        self.send_queue().set_enabled(true).await;
3473
3474        info!("Client::resume — complete");
3475        Ok(())
3476    }
3477
3478    /// Perform database optimizations if any are available, i.e. vacuuming in
3479    /// SQLite.
3480    ///
3481    /// **Warning:** this was added to check if SQLite fragmentation was the
3482    /// source of performance issues, **DO NOT use in production**.
3483    #[doc(hidden)]
3484    pub async fn optimize_stores(&self) -> Result<()> {
3485        trace!("Optimizing state store...");
3486        self.state_store().optimize().await?;
3487
3488        trace!("Optimizing event cache store...");
3489        if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3490            clean_lock.optimize().await?;
3491        }
3492
3493        trace!("Optimizing media store...");
3494        self.media_store().lock().await?.optimize().await?;
3495
3496        Ok(())
3497    }
3498
3499    /// Returns the sizes of the existing stores, if known.
3500    pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3501        #[cfg(feature = "e2e-encryption")]
3502        let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3503            && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3504        {
3505            Some(store_size)
3506        } else {
3507            None
3508        };
3509        #[cfg(not(feature = "e2e-encryption"))]
3510        let crypto_store_size = None;
3511
3512        let state_store_size = self.state_store().get_size().await.ok().flatten();
3513
3514        let event_cache_store_size = if let Some(clean_lock) =
3515            self.event_cache_store().lock().await?.as_clean()
3516            && let Ok(Some(store_size)) = clean_lock.get_size().await
3517        {
3518            Some(store_size)
3519        } else {
3520            None
3521        };
3522
3523        let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3524
3525        Ok(StoreSizes {
3526            crypto_store: crypto_store_size,
3527            state_store: state_store_size,
3528            event_cache_store: event_cache_store_size,
3529            media_store: media_store_size,
3530        })
3531    }
3532
3533    /// Get a reference to the client's task monitor, for spawning background
3534    /// tasks.
3535    pub fn task_monitor(&self) -> &TaskMonitor {
3536        &self.inner.task_monitor
3537    }
3538
3539    /// Add a subscriber for duplicate key upload error notifications triggered
3540    /// by requests to /keys/upload.
3541    #[cfg(feature = "e2e-encryption")]
3542    pub fn subscribe_to_duplicate_key_upload_errors(
3543        &self,
3544    ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3545        self.inner.duplicate_key_upload_error_sender.subscribe()
3546    }
3547
3548    /// Check the record of whether we are waiting for an [MSC4268] key bundle
3549    /// for the given room.
3550    ///
3551    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3552    #[cfg(feature = "e2e-encryption")]
3553    pub async fn get_pending_key_bundle_details_for_room(
3554        &self,
3555        room_id: &RoomId,
3556    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3557        Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3558    }
3559
3560    /// Returns the [`DmRoomDefinition`] this client uses to check if a room is
3561    /// a DM.
3562    pub fn dm_room_definition(&self) -> &DmRoomDefinition {
3563        &self.inner.base_client.dm_room_definition
3564    }
3565}
3566
3567/// Contains the disk size of the different stores, if known. It won't be
3568/// available for in-memory stores.
3569#[derive(Debug, Clone)]
3570pub struct StoreSizes {
3571    /// The size of the CryptoStore.
3572    pub crypto_store: Option<usize>,
3573    /// The size of the StateStore.
3574    pub state_store: Option<usize>,
3575    /// The size of the EventCacheStore.
3576    pub event_cache_store: Option<usize>,
3577    /// The size of the MediaStore.
3578    pub media_store: Option<usize>,
3579}
3580
3581#[cfg(any(feature = "testing", test))]
3582impl Client {
3583    /// Test helper to mark users as tracked by the crypto layer.
3584    #[cfg(feature = "e2e-encryption")]
3585    pub async fn update_tracked_users_for_testing(
3586        &self,
3587        user_ids: impl IntoIterator<Item = &UserId>,
3588    ) {
3589        let olm = self.olm_machine().await;
3590        let olm = olm.as_ref().unwrap();
3591        olm.update_tracked_users(user_ids).await.unwrap();
3592    }
3593}
3594
3595/// A weak reference to the inner client, useful when trying to get a handle
3596/// on the owning client.
3597#[derive(Clone, Debug)]
3598pub(crate) struct WeakClient {
3599    client: Weak<ClientInner>,
3600}
3601
3602impl WeakClient {
3603    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3604    pub(crate) fn from_inner(client: &Arc<ClientInner>) -> Self {
3605        Self { client: Arc::downgrade(client) }
3606    }
3607
3608    /// Construct a [`WeakClient`] from a [`Client`].
3609    pub fn from_client(client: &Client) -> Self {
3610        Self::from_inner(&client.inner)
3611    }
3612
3613    /// Attempts to get a [`Client`] from this [`WeakClient`].
3614    pub fn get(&self) -> Option<Client> {
3615        self.client.upgrade().map(|inner| Client { inner })
3616    }
3617
3618    /// Gets the number of strong (`Arc`) pointers still pointing to this
3619    /// client.
3620    #[allow(dead_code)]
3621    pub fn strong_count(&self) -> usize {
3622        self.client.strong_count()
3623    }
3624}
3625
3626/// Information about the state of a room before we joined it.
3627#[derive(Debug, Clone, Default)]
3628struct PreJoinRoomInfo {
3629    /// The user who invited us to the room, if any.
3630    pub inviter: Option<RoomMember>,
3631}
3632
3633// The http mocking library is not supported for wasm32
3634#[cfg(all(test, not(target_family = "wasm")))]
3635pub(crate) mod tests {
3636    use std::{sync::Arc, time::Duration};
3637
3638    use assert_matches::assert_matches;
3639    use assert_matches2::assert_let;
3640    use eyeball::SharedObservable;
3641    use futures_util::{FutureExt, StreamExt, pin_mut};
3642    use js_int::{UInt, uint};
3643    use matrix_sdk_base::{
3644        RoomState,
3645        store::{MemoryStore, StoreConfig},
3646        ttl::TtlValue,
3647    };
3648    use matrix_sdk_test::{
3649        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3650        event_factory::EventFactory,
3651    };
3652    #[cfg(target_family = "wasm")]
3653    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3654
3655    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3656    use ruma::{
3657        RoomId, ServerName, UserId,
3658        api::{
3659            FeatureFlag, MatrixVersion,
3660            client::{room::create_room::v3::Request as CreateRoomRequest, rtc::RtcTransport},
3661        },
3662        assign,
3663        events::{
3664            ignored_user_list::IgnoredUserListEventContent,
3665            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3666        },
3667        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3668    };
3669    use serde_json::json;
3670    use stream_assert::{assert_next_matches, assert_pending};
3671    use tokio::{
3672        spawn,
3673        time::{sleep, timeout},
3674    };
3675    use url::Url;
3676
3677    use super::Client;
3678    use crate::{
3679        Error, TransmissionProgress,
3680        client::{WeakClient, caches::CachedValue, futures::SendMediaUploadRequest},
3681        config::{RequestConfig, SyncSettings},
3682        futures::SendRequest,
3683        media::MediaError,
3684        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3685    };
3686
3687    #[async_test]
3688    async fn test_account_data() {
3689        let server = MatrixMockServer::new().await;
3690        let client = server.client_builder().build().await;
3691
3692        let f = EventFactory::new();
3693        server
3694            .mock_sync()
3695            .ok_and_run(&client, |builder| {
3696                builder.add_global_account_data(
3697                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3698                );
3699            })
3700            .await;
3701
3702        let content = client
3703            .account()
3704            .account_data::<IgnoredUserListEventContent>()
3705            .await
3706            .unwrap()
3707            .unwrap()
3708            .deserialize()
3709            .unwrap();
3710
3711        assert_eq!(content.ignored_users.len(), 1);
3712    }
3713
3714    #[async_test]
3715    async fn test_successful_discovery() {
3716        // Imagine this is `matrix.org`.
3717        let server = MatrixMockServer::new().await;
3718        let server_url = server.uri();
3719
3720        // Imagine this is `matrix-client.matrix.org`.
3721        let homeserver = MatrixMockServer::new().await;
3722        let homeserver_url = homeserver.uri();
3723
3724        // Imagine Alice has the user ID `@alice:matrix.org`.
3725        let domain = server_url.strip_prefix("http://").unwrap();
3726        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3727
3728        // The `.well-known` is on the server (e.g. `matrix.org`).
3729        server
3730            .mock_well_known()
3731            .ok_with_homeserver_url(&homeserver_url)
3732            .mock_once()
3733            .named("well-known")
3734            .mount()
3735            .await;
3736
3737        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3738        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3739
3740        let client = Client::builder()
3741            .insecure_server_name_no_tls(alice.server_name())
3742            .build()
3743            .await
3744            .unwrap();
3745
3746        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3747        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3748        client.server_versions().await.unwrap();
3749    }
3750
3751    #[async_test]
3752    async fn test_discovery_broken_server() {
3753        let server = MatrixMockServer::new().await;
3754        let server_url = server.uri();
3755        let domain = server_url.strip_prefix("http://").unwrap();
3756        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3757
3758        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3759
3760        assert!(
3761            Client::builder()
3762                .insecure_server_name_no_tls(alice.server_name())
3763                .build()
3764                .await
3765                .is_err(),
3766            "Creating a client from a user ID should fail when the .well-known request fails."
3767        );
3768    }
3769
3770    #[async_test]
3771    async fn test_room_creation() {
3772        let server = MatrixMockServer::new().await;
3773        let client = server.client_builder().build().await;
3774
3775        let f = EventFactory::new().sender(user_id!("@example:localhost"));
3776        server
3777            .mock_sync()
3778            .ok_and_run(&client, |builder| {
3779                builder.add_joined_room(
3780                    JoinedRoomBuilder::default()
3781                        .add_state_event(
3782                            f.member(user_id!("@example:localhost")).display_name("example"),
3783                        )
3784                        .add_state_event(f.default_power_levels()),
3785                );
3786            })
3787            .await;
3788
3789        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3790        assert_eq!(room.state(), RoomState::Joined);
3791    }
3792
3793    #[async_test]
3794    async fn test_retry_limit_http_requests() {
3795        let server = MatrixMockServer::new().await;
3796        let client = server
3797            .client_builder()
3798            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3799            .build()
3800            .await;
3801
3802        assert!(client.request_config().retry_limit.unwrap() == 4);
3803
3804        server.mock_who_am_i().error500().expect(4).mount().await;
3805
3806        client.whoami().await.unwrap_err();
3807    }
3808
3809    #[async_test]
3810    async fn test_retry_timeout_http_requests() {
3811        // Keep this timeout small so that the test doesn't take long
3812        let retry_timeout = Duration::from_secs(5);
3813        let server = MatrixMockServer::new().await;
3814        let client = server
3815            .client_builder()
3816            .on_builder(|builder| {
3817                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3818            })
3819            .build()
3820            .await;
3821
3822        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3823
3824        server.mock_login().error500().expect(2..).mount().await;
3825
3826        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3827    }
3828
3829    #[async_test]
3830    async fn test_short_retry_initial_http_requests() {
3831        let server = MatrixMockServer::new().await;
3832        let client = server
3833            .client_builder()
3834            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3835            .build()
3836            .await;
3837
3838        server.mock_login().error500().expect(3..).mount().await;
3839
3840        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3841    }
3842
3843    #[async_test]
3844    async fn test_no_retry_http_requests() {
3845        let server = MatrixMockServer::new().await;
3846        let client = server.client_builder().build().await;
3847
3848        server.mock_devices().error500().mock_once().mount().await;
3849
3850        client.devices().await.unwrap_err();
3851    }
3852
3853    #[async_test]
3854    async fn test_set_homeserver() {
3855        let client = MockClientBuilder::new(None).build().await;
3856        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3857
3858        let homeserver = Url::parse("http://example.com/").unwrap();
3859        client.set_homeserver(homeserver.clone());
3860        assert_eq!(client.homeserver(), homeserver);
3861    }
3862
3863    #[async_test]
3864    async fn test_search_user_request() {
3865        let server = MatrixMockServer::new().await;
3866        let client = server.client_builder().build().await;
3867
3868        server.mock_user_directory().ok().mock_once().mount().await;
3869
3870        let response = client.search_users("test", 50).await.unwrap();
3871        assert_eq!(response.results.len(), 1);
3872        let result = &response.results[0];
3873        assert_eq!(result.user_id.to_string(), "@test:example.me");
3874        assert_eq!(result.display_name.clone().unwrap(), "Test");
3875        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3876        assert!(!response.limited);
3877    }
3878
3879    #[async_test]
3880    async fn test_request_unstable_features() {
3881        let server = MatrixMockServer::new().await;
3882        let client = server.client_builder().no_server_versions().build().await;
3883
3884        server
3885            .mock_versions()
3886            .with_feature("org.matrix.e2e_cross_signing", true)
3887            .ok()
3888            .mock_once()
3889            .mount()
3890            .await;
3891
3892        let unstable_features = client.unstable_features().await.unwrap();
3893        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3894        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3895    }
3896
3897    #[async_test]
3898    async fn test_can_homeserver_push_encrypted_event_to_device() {
3899        let server = MatrixMockServer::new().await;
3900        let client = server.client_builder().no_server_versions().build().await;
3901
3902        server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3903
3904        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3905        assert!(msc4028_enabled);
3906    }
3907
3908    #[async_test]
3909    async fn test_recently_visited_rooms() {
3910        // Tracking recently visited rooms requires authentication
3911        let client = MockClientBuilder::new(None).unlogged().build().await;
3912        assert_matches!(
3913            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3914            Err(Error::AuthenticationRequired)
3915        );
3916
3917        let client = MockClientBuilder::new(None).build().await;
3918        let account = client.account();
3919
3920        // We should start off with an empty list
3921        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3922
3923        // Tracking a valid room id should add it to the list
3924        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3925        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3926        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3927
3928        // And the existing list shouldn't be changed
3929        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3930        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3931
3932        // Tracking the same room again shouldn't change the list
3933        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3934        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3935        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3936
3937        // Tracking a second room should add it to the front of the list
3938        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3939        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3940        assert_eq!(
3941            account.get_recently_visited_rooms().await.unwrap(),
3942            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3943        );
3944
3945        // Tracking the first room yet again should move it to the front of the list
3946        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3947        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3948        assert_eq!(
3949            account.get_recently_visited_rooms().await.unwrap(),
3950            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3951        );
3952
3953        // Tracking should be capped at 20
3954        for n in 0..20 {
3955            account
3956                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3957                .await
3958                .unwrap();
3959        }
3960
3961        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3962
3963        // And the initial rooms should've been pushed out
3964        let rooms = account.get_recently_visited_rooms().await.unwrap();
3965        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3966        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3967
3968        // And the last tracked room should be the first
3969        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3970    }
3971
3972    #[async_test]
3973    async fn test_client_no_cycle_with_event_cache() {
3974        let client = MockClientBuilder::new(None).build().await;
3975
3976        // Wait for the init tasks to die.
3977        sleep(Duration::from_secs(1)).await;
3978
3979        let weak_client = WeakClient::from_client(&client);
3980        assert_eq!(weak_client.strong_count(), 1);
3981
3982        {
3983            let room_id = room_id!("!room:example.org");
3984
3985            // Have the client know the room.
3986            let response = SyncResponseBuilder::default()
3987                .add_joined_room(JoinedRoomBuilder::new(room_id))
3988                .build_sync_response();
3989            client.inner.base_client.receive_sync_response(response).await.unwrap();
3990
3991            client.event_cache().subscribe().unwrap();
3992
3993            let (_room_event_cache, _drop_handles) =
3994                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3995        }
3996
3997        drop(client);
3998
3999        // Give a bit of time for background tasks to die.
4000        sleep(Duration::from_secs(1)).await;
4001
4002        // The weak client must be the last reference to the client now.
4003        assert_eq!(weak_client.strong_count(), 0);
4004        let client = weak_client.get();
4005        assert!(
4006            client.is_none(),
4007            "too many strong references to the client: {}",
4008            Arc::strong_count(&client.unwrap().inner)
4009        );
4010    }
4011
4012    #[async_test]
4013    async fn test_supported_versions_caching() {
4014        let server = MatrixMockServer::new().await;
4015
4016        let versions_mock = server
4017            .mock_versions()
4018            .expect_default_access_token()
4019            .with_feature("org.matrix.e2e_cross_signing", true)
4020            .ok()
4021            .named("first versions mock")
4022            .expect(1)
4023            .mount_as_scoped()
4024            .await;
4025
4026        let memory_store = Arc::new(MemoryStore::new());
4027        let client = server
4028            .client_builder()
4029            .no_server_versions()
4030            .on_builder(|builder| {
4031                builder.store_config(
4032                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4033                        .state_store(memory_store.clone()),
4034                )
4035            })
4036            .build()
4037            .await;
4038
4039        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4040
4041        // The result was cached.
4042        assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
4043        // This subsequent call hits the in-memory cache.
4044        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4045
4046        drop(client);
4047
4048        let client = server
4049            .client_builder()
4050            .no_server_versions()
4051            .on_builder(|builder| {
4052                builder.store_config(
4053                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4054                        .state_store(memory_store.clone()),
4055                )
4056            })
4057            .build()
4058            .await;
4059
4060        // These calls to the new client hit the on-disk cache.
4061        assert!(
4062            client
4063                .unstable_features()
4064                .await
4065                .unwrap()
4066                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
4067        );
4068
4069        let supported = client.supported_versions().await.unwrap();
4070        assert!(supported.versions.contains(&MatrixVersion::V1_0));
4071        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4072
4073        // Then this call hits the in-memory cache.
4074        let supported = client.supported_versions().await.unwrap();
4075        assert!(supported.versions.contains(&MatrixVersion::V1_0));
4076        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4077
4078        drop(versions_mock);
4079
4080        // Now, reset the cache, and observe the endpoint being called again once.
4081        client.reset_supported_versions().await.unwrap();
4082
4083        server.mock_versions().ok().expect(2).named("second versions mock").mount().await;
4084
4085        // Hits network again.
4086        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4087        // Hits in-memory cache again.
4088        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4089        assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4090
4091        // Force an expiry of the data.
4092        let supported_versions = client.supported_versions_cached().await.unwrap().unwrap();
4093        let mut ttl_value = TtlValue::new(supported_versions);
4094        ttl_value.expire();
4095        client.inner.caches.supported_versions.set_value(ttl_value);
4096
4097        // Call the method to trigger a cache refresh background task.
4098        client.supported_versions_cached().await.unwrap().unwrap();
4099
4100        // We wait for the task to finish, the endpoint should have been called again.
4101        sleep(Duration::from_secs(1)).await;
4102        assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4103    }
4104
4105    #[async_test]
4106    async fn test_well_known_caching() {
4107        let server = MatrixMockServer::new().await;
4108        let server_url = server.uri();
4109        let domain = server_url.strip_prefix("http://").unwrap();
4110        let server_name = <&ServerName>::try_from(domain).unwrap();
4111        let rtc_foci = vec![RtcTransport::livekit("https://livekit.example.com".to_owned())];
4112
4113        let well_known_mock = server
4114            .mock_well_known()
4115            .ok()
4116            .named("well known mock")
4117            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
4118            .mount_as_scoped()
4119            .await;
4120
4121        let memory_store = Arc::new(MemoryStore::new());
4122        let client = Client::builder()
4123            .insecure_server_name_no_tls(server_name)
4124            .store_config(
4125                StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4126                    .state_store(memory_store.clone()),
4127            )
4128            .build()
4129            .await
4130            .unwrap();
4131
4132        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4133
4134        // This subsequent call hits the in-memory cache.
4135        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4136
4137        drop(client);
4138
4139        let client = server
4140            .client_builder()
4141            .no_server_versions()
4142            .on_builder(|builder| {
4143                builder.store_config(
4144                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4145                        .state_store(memory_store.clone()),
4146                )
4147            })
4148            .build()
4149            .await;
4150
4151        // This call to the new client hits the on-disk cache.
4152        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4153
4154        // Then this call hits the in-memory cache.
4155        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4156
4157        drop(well_known_mock);
4158
4159        // Now, reset the cache, and observe the endpoints being called again once.
4160        client.reset_well_known().await.unwrap();
4161
4162        server.mock_well_known().ok().named("second well known mock").expect(2).mount().await;
4163
4164        // Hits network again.
4165        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4166        // Hits in-memory cache again.
4167        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4168
4169        // Force an expiry of the data.
4170        let well_known = client.well_known().await;
4171        let mut ttl_value = TtlValue::new(well_known);
4172        ttl_value.expire();
4173        client.inner.caches.well_known.set_value(ttl_value);
4174
4175        // Call the method again to trigger a cache refresh background task.
4176        client.well_known().await;
4177
4178        // We wait for the task to finish, the endpoint should have been called again.
4179        // We need to wait a bit because the first requests using the server name of the
4180        // user will fail, only the requests using the homeserver URL will succeed.
4181        sleep(Duration::from_secs(5)).await;
4182        assert_matches!(client.inner.caches.well_known.value(), CachedValue::Cached(value) if !value.has_expired());
4183    }
4184
4185    #[async_test]
4186    async fn test_missing_well_known_caching() {
4187        let server = MatrixMockServer::new().await;
4188        let rtc_foci: Vec<RtcTransport> = vec![];
4189
4190        let well_known_mock = server
4191            .mock_well_known()
4192            .error_unrecognized()
4193            .named("first well-known mock")
4194            .expect(1)
4195            .mount_as_scoped()
4196            .await;
4197
4198        let memory_store = Arc::new(MemoryStore::new());
4199        let client = server
4200            .client_builder()
4201            .on_builder(|builder| {
4202                builder.store_config(
4203                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4204                        .state_store(memory_store.clone()),
4205                )
4206            })
4207            .build()
4208            .await;
4209
4210        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4211
4212        // This subsequent call hits the in-memory cache.
4213        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4214
4215        drop(client);
4216
4217        let client = server
4218            .client_builder()
4219            .on_builder(|builder| {
4220                builder.store_config(
4221                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4222                        .state_store(memory_store.clone()),
4223                )
4224            })
4225            .build()
4226            .await;
4227
4228        // This call to the new client hits the on-disk cache.
4229        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4230
4231        // Then this call hits the in-memory cache.
4232        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4233
4234        drop(well_known_mock);
4235
4236        // Now, reset the cache, and observe the endpoints being called again once.
4237        client.reset_well_known().await.unwrap();
4238
4239        server
4240            .mock_well_known()
4241            .error_unrecognized()
4242            .expect(1)
4243            .named("second well-known mock")
4244            .mount()
4245            .await;
4246
4247        // Hits network again.
4248        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4249        // Hits in-memory cache again.
4250        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4251    }
4252
4253    #[async_test]
4254    async fn test_no_network_doesnt_cause_infinite_retries() {
4255        // We want infinite retries for transient errors.
4256        let client = MockClientBuilder::new(None)
4257            .on_builder(|builder| builder.request_config(RequestConfig::new()))
4258            .build()
4259            .await;
4260
4261        // We don't define a mock server on purpose here, so that the error is really a
4262        // network error.
4263        client.whoami().await.unwrap_err();
4264    }
4265
4266    #[async_test]
4267    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4268        let server = MatrixMockServer::new().await;
4269        let client = server.client_builder().build().await;
4270
4271        let room_id = room_id!("!room:example.org");
4272
4273        server
4274            .mock_sync()
4275            .ok_and_run(&client, |builder| {
4276                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4277            })
4278            .await;
4279
4280        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4281        assert_eq!(room.room_id(), room_id);
4282    }
4283
4284    #[async_test]
4285    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4286        let server = MatrixMockServer::new().await;
4287        let client = server.client_builder().build().await;
4288
4289        let room_id = room_id!("!room:example.org");
4290
4291        let client = Arc::new(client);
4292
4293        // Perform the /sync request with a delay so it starts after the
4294        // `await_room_remote_echo` call has happened
4295        spawn({
4296            let client = client.clone();
4297            async move {
4298                sleep(Duration::from_millis(100)).await;
4299
4300                server
4301                    .mock_sync()
4302                    .ok_and_run(&client, |builder| {
4303                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4304                    })
4305                    .await;
4306            }
4307        });
4308
4309        let room =
4310            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4311        assert_eq!(room.room_id(), room_id);
4312    }
4313
4314    #[async_test]
4315    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4316        let client = MockClientBuilder::new(None).build().await;
4317
4318        let room_id = room_id!("!room:example.org");
4319        // Room is not present so the client won't be able to find it. The call will
4320        // timeout.
4321        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4322    }
4323
4324    #[async_test]
4325    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4326        let server = MatrixMockServer::new().await;
4327        let client = server.client_builder().build().await;
4328
4329        server.mock_create_room().ok().mount().await;
4330
4331        // Create a room in the internal store
4332        let room = client
4333            .create_room(assign!(CreateRoomRequest::new(), {
4334                invite: vec![],
4335                is_direct: false,
4336            }))
4337            .await
4338            .unwrap();
4339
4340        // Room is locally present, but not synced, the call will timeout
4341        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4342            .await
4343            .unwrap_err();
4344    }
4345
4346    #[async_test]
4347    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4348        let server = MatrixMockServer::new().await;
4349        let client = server.client_builder().build().await;
4350
4351        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4352
4353        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4354        assert_matches!(ret, Ok(true));
4355    }
4356
4357    #[async_test]
4358    async fn test_is_room_alias_available_if_alias_is_resolved() {
4359        let server = MatrixMockServer::new().await;
4360        let client = server.client_builder().build().await;
4361
4362        server
4363            .mock_room_directory_resolve_alias()
4364            .ok("!some_room_id:matrix.org", Vec::new())
4365            .expect(1)
4366            .mount()
4367            .await;
4368
4369        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4370        assert_matches!(ret, Ok(false));
4371    }
4372
4373    #[async_test]
4374    async fn test_is_room_alias_available_if_error_found() {
4375        let server = MatrixMockServer::new().await;
4376        let client = server.client_builder().build().await;
4377
4378        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4379
4380        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4381        assert_matches!(ret, Err(_));
4382    }
4383
4384    #[async_test]
4385    async fn test_create_room_alias() {
4386        let server = MatrixMockServer::new().await;
4387        let client = server.client_builder().build().await;
4388
4389        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4390
4391        let ret = client
4392            .create_room_alias(
4393                room_alias_id!("#some_alias:matrix.org"),
4394                room_id!("!some_room:matrix.org"),
4395            )
4396            .await;
4397        assert_matches!(ret, Ok(()));
4398    }
4399
4400    #[async_test]
4401    async fn test_join_room_by_id_or_alias() {
4402        use wiremock::{
4403            Mock, ResponseTemplate,
4404            matchers::{method, path_regex},
4405        };
4406        let server = MatrixMockServer::new().await;
4407        let client = server.client_builder().build().await;
4408
4409        let target_room_id = room_id!("!some_id:matrix.org");
4410        let target_alias = room_alias_id!("#some_alias:matrix.org");
4411
4412        Mock::given(method("POST"))
4413            .and(path_regex("^/_matrix/client/v3/join/.*$"))
4414            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
4415                "room_id": target_room_id
4416            })))
4417            .mount(server.server())
4418            .await;
4419
4420        server
4421            .mock_room_directory_resolve_alias()
4422            .ok(target_room_id.as_str(), Vec::new())
4423            .mount()
4424            .await;
4425
4426        server.mock_room_join(target_room_id).ok().mount().await;
4427
4428        let ret = client.join_room_by_id_or_alias(target_alias.into(), &[]).await;
4429        assert!(ret.is_ok());
4430
4431        let ret = client.join_room_by_id_or_alias(target_room_id.into(), &[]).await;
4432        assert!(ret.is_ok());
4433    }
4434
4435    #[async_test]
4436    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4437        let server = MatrixMockServer::new().await;
4438        let client = server.client_builder().build().await;
4439
4440        let room_id = room_id!("!a-room:matrix.org");
4441
4442        // Make sure the summary endpoint is called once
4443        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4444
4445        // We create a locally cached invited room
4446        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4447
4448        // And we get a preview, the server endpoint was reached
4449        let preview = client
4450            .get_room_preview(room_id.into(), Vec::new())
4451            .await
4452            .expect("Room preview should be retrieved");
4453
4454        assert_eq!(invited_room.room_id(), preview.room_id);
4455    }
4456
4457    #[async_test]
4458    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4459        let server = MatrixMockServer::new().await;
4460        let client = server.client_builder().build().await;
4461
4462        let room_id = room_id!("!a-room:matrix.org");
4463
4464        // Make sure the summary endpoint is called once
4465        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4466
4467        // We create a locally cached left room
4468        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4469
4470        // And we get a preview, the server endpoint was reached
4471        let preview = client
4472            .get_room_preview(room_id.into(), Vec::new())
4473            .await
4474            .expect("Room preview should be retrieved");
4475
4476        assert_eq!(left_room.room_id(), preview.room_id);
4477    }
4478
4479    #[async_test]
4480    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4481        let server = MatrixMockServer::new().await;
4482        let client = server.client_builder().build().await;
4483
4484        let room_id = room_id!("!a-room:matrix.org");
4485
4486        // Make sure the summary endpoint is called once
4487        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4488
4489        // We create a locally cached knocked room
4490        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4491
4492        // And we get a preview, the server endpoint was reached
4493        let preview = client
4494            .get_room_preview(room_id.into(), Vec::new())
4495            .await
4496            .expect("Room preview should be retrieved");
4497
4498        assert_eq!(knocked_room.room_id(), preview.room_id);
4499    }
4500
4501    #[async_test]
4502    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4503        let server = MatrixMockServer::new().await;
4504        let client = server.client_builder().build().await;
4505
4506        let room_id = room_id!("!a-room:matrix.org");
4507
4508        // Make sure the summary endpoint is not called
4509        server.mock_room_summary().ok(room_id).never().mount().await;
4510
4511        // We create a locally cached joined room
4512        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4513
4514        // And we get a preview, no server endpoint was reached
4515        let preview = client
4516            .get_room_preview(room_id.into(), Vec::new())
4517            .await
4518            .expect("Room preview should be retrieved");
4519
4520        assert_eq!(joined_room.room_id(), preview.room_id);
4521    }
4522
4523    #[async_test]
4524    async fn test_media_preview_config() {
4525        let server = MatrixMockServer::new().await;
4526        let client = server.client_builder().build().await;
4527
4528        server
4529            .mock_sync()
4530            .ok_and_run(&client, |builder| {
4531                builder.add_custom_global_account_data(json!({
4532                    "content": {
4533                        "media_previews": "private",
4534                        "invite_avatars": "off"
4535                    },
4536                    "type": "m.media_preview_config"
4537                }));
4538            })
4539            .await;
4540
4541        let (initial_value, stream) =
4542            client.account().observe_media_preview_config().await.unwrap();
4543
4544        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4545        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4546        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4547        pin_mut!(stream);
4548        assert_pending!(stream);
4549
4550        server
4551            .mock_sync()
4552            .ok_and_run(&client, |builder| {
4553                builder.add_custom_global_account_data(json!({
4554                    "content": {
4555                        "media_previews": "off",
4556                        "invite_avatars": "on"
4557                    },
4558                    "type": "m.media_preview_config"
4559                }));
4560            })
4561            .await;
4562
4563        assert_next_matches!(
4564            stream,
4565            MediaPreviewConfigEventContent {
4566                media_previews: Some(MediaPreviews::Off),
4567                invite_avatars: Some(InviteAvatars::On),
4568                ..
4569            }
4570        );
4571        assert_pending!(stream);
4572    }
4573
4574    #[async_test]
4575    async fn test_unstable_media_preview_config() {
4576        let server = MatrixMockServer::new().await;
4577        let client = server.client_builder().build().await;
4578
4579        server
4580            .mock_sync()
4581            .ok_and_run(&client, |builder| {
4582                builder.add_custom_global_account_data(json!({
4583                    "content": {
4584                        "media_previews": "private",
4585                        "invite_avatars": "off"
4586                    },
4587                    "type": "io.element.msc4278.media_preview_config"
4588                }));
4589            })
4590            .await;
4591
4592        let (initial_value, stream) =
4593            client.account().observe_media_preview_config().await.unwrap();
4594
4595        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4596        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4597        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4598        pin_mut!(stream);
4599        assert_pending!(stream);
4600
4601        server
4602            .mock_sync()
4603            .ok_and_run(&client, |builder| {
4604                builder.add_custom_global_account_data(json!({
4605                    "content": {
4606                        "media_previews": "off",
4607                        "invite_avatars": "on"
4608                    },
4609                    "type": "io.element.msc4278.media_preview_config"
4610                }));
4611            })
4612            .await;
4613
4614        assert_next_matches!(
4615            stream,
4616            MediaPreviewConfigEventContent {
4617                media_previews: Some(MediaPreviews::Off),
4618                invite_avatars: Some(InviteAvatars::On),
4619                ..
4620            }
4621        );
4622        assert_pending!(stream);
4623    }
4624
4625    #[async_test]
4626    async fn test_media_preview_config_not_found() {
4627        let server = MatrixMockServer::new().await;
4628        let client = server.client_builder().build().await;
4629
4630        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4631
4632        assert!(initial_value.is_none());
4633    }
4634
4635    #[async_test]
4636    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4637        // The default Matrix version we use is 1.11 or higher, so authenticated media
4638        // is supported.
4639        let server = MatrixMockServer::new().await;
4640        let client = server.client_builder().build().await;
4641
4642        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4643
4644        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4645        client.load_or_fetch_max_upload_size().await.unwrap();
4646
4647        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4648    }
4649
4650    #[async_test]
4651    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4652        // The server must advertise support for the stable feature for authenticated
4653        // media support, so we mock the `GET /versions` response.
4654        let server = MatrixMockServer::new().await;
4655        let client = server.client_builder().no_server_versions().build().await;
4656
4657        server
4658            .mock_versions()
4659            .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4660            .with_feature("org.matrix.msc3916.stable", true)
4661            .ok()
4662            .named("versions")
4663            .expect(1)
4664            .mount()
4665            .await;
4666
4667        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4668
4669        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4670        client.load_or_fetch_max_upload_size().await.unwrap();
4671
4672        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4673    }
4674
4675    #[async_test]
4676    async fn test_load_or_fetch_max_upload_size_no_auth() {
4677        // The server must not support Matrix 1.11 or higher for unauthenticated
4678        // media requests, so we mock the `GET /versions` response.
4679        let server = MatrixMockServer::new().await;
4680        let client = server.client_builder().no_server_versions().build().await;
4681
4682        server
4683            .mock_versions()
4684            .with_versions(vec!["v1.1"])
4685            .ok()
4686            .named("versions")
4687            .expect(1)
4688            .mount()
4689            .await;
4690
4691        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4692
4693        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4694        client.load_or_fetch_max_upload_size().await.unwrap();
4695
4696        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4697    }
4698
4699    #[async_test]
4700    async fn test_uploading_a_too_large_media_file() {
4701        let server = MatrixMockServer::new().await;
4702        let client = server.client_builder().build().await;
4703
4704        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4705        client.load_or_fetch_max_upload_size().await.unwrap();
4706        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4707
4708        let data = vec![1, 2];
4709        let upload_request =
4710            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4711        let request = SendRequest {
4712            client: client.clone(),
4713            request: upload_request,
4714            config: None,
4715            send_progress: SharedObservable::new(TransmissionProgress::default()),
4716        };
4717        let media_request = SendMediaUploadRequest::new(request);
4718
4719        let error = media_request.await.err();
4720        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4721        assert_eq!(max, uint!(1));
4722        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4723    }
4724
4725    #[async_test]
4726    async fn test_dont_ignore_timeout_on_first_sync() {
4727        let server = MatrixMockServer::new().await;
4728        let client = server.client_builder().build().await;
4729
4730        server
4731            .mock_sync()
4732            .timeout(Some(Duration::from_secs(30)))
4733            .ok(|_| {})
4734            .mock_once()
4735            .named("sync_with_timeout")
4736            .mount()
4737            .await;
4738
4739        // Call the endpoint once to check the timeout.
4740        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4741
4742        timeout(Duration::from_secs(1), async {
4743            stream.next().await.unwrap().unwrap();
4744        })
4745        .await
4746        .unwrap();
4747    }
4748
4749    #[async_test]
4750    async fn test_ignore_timeout_on_first_sync() {
4751        let server = MatrixMockServer::new().await;
4752        let client = server.client_builder().build().await;
4753
4754        server
4755            .mock_sync()
4756            .timeout(None)
4757            .ok(|_| {})
4758            .mock_once()
4759            .named("sync_no_timeout")
4760            .mount()
4761            .await;
4762        server
4763            .mock_sync()
4764            .timeout(Some(Duration::from_secs(30)))
4765            .ok(|_| {})
4766            .mock_once()
4767            .named("sync_with_timeout")
4768            .mount()
4769            .await;
4770
4771        // Call each version of the endpoint once to check the timeouts.
4772        let mut stream = Box::pin(
4773            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4774        );
4775
4776        timeout(Duration::from_secs(1), async {
4777            stream.next().await.unwrap().unwrap();
4778            stream.next().await.unwrap().unwrap();
4779        })
4780        .await
4781        .unwrap();
4782    }
4783
4784    #[async_test]
4785    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4786        let server = MatrixMockServer::new().await;
4787        let client = server.client_builder().build().await;
4788        // This is the user ID that is inside MemberAdditional.
4789        // Note the confusing username, so we can share
4790        // GlobalAccountDataTestEvent::Direct with the invited test.
4791        let user_id = user_id!("@invited:localhost");
4792
4793        // When we receive a sync response saying "invited" is invited to a DM
4794        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4795        let response = SyncResponseBuilder::default()
4796            .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4797            .add_global_account_data(
4798                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4799            )
4800            .build_sync_response();
4801        client.base_client().receive_sync_response(response).await.unwrap();
4802
4803        // Then get_dm_room finds this room
4804        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4805        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4806    }
4807
4808    #[async_test]
4809    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4810        let server = MatrixMockServer::new().await;
4811        let client = server.client_builder().build().await;
4812        // This is the user ID that is inside MemberInvite
4813        let user_id = user_id!("@invited:localhost");
4814
4815        // When we receive a sync response saying "invited" is invited to a DM
4816        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4817        let response = SyncResponseBuilder::default()
4818            .add_joined_room(
4819                JoinedRoomBuilder::default()
4820                    .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4821            )
4822            .add_global_account_data(
4823                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4824            )
4825            .build_sync_response();
4826        client.base_client().receive_sync_response(response).await.unwrap();
4827
4828        // Then get_dm_room finds this room
4829        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4830        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4831    }
4832
4833    #[async_test]
4834    async fn test_get_dm_room_still_finds_left_room() {
4835        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4836        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4837
4838        let server = MatrixMockServer::new().await;
4839        let client = server.client_builder().build().await;
4840        // This is the user ID that is inside MemberAdditional.
4841        // Note the confusing username, so we can share
4842        // GlobalAccountDataTestEvent::Direct with the invited test.
4843        let user_id = user_id!("@invited:localhost");
4844
4845        // When we receive a sync response saying "invited" has left a DM
4846        let f = EventFactory::new().sender(user_id);
4847        let response = SyncResponseBuilder::default()
4848            .add_joined_room(
4849                JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4850            )
4851            .add_global_account_data(
4852                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4853            )
4854            .build_sync_response();
4855        client.base_client().receive_sync_response(response).await.unwrap();
4856
4857        // Then get_dm_room finds this room
4858        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4859        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4860    }
4861
4862    #[async_test]
4863    async fn test_device_exists() {
4864        let server = MatrixMockServer::new().await;
4865        let client = server.client_builder().build().await;
4866
4867        server.mock_get_device().ok().expect(1).mount().await;
4868
4869        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4870    }
4871
4872    #[async_test]
4873    async fn test_device_exists_404() {
4874        let server = MatrixMockServer::new().await;
4875        let client = server.client_builder().build().await;
4876
4877        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4878    }
4879
4880    #[async_test]
4881    async fn test_device_exists_500() {
4882        let server = MatrixMockServer::new().await;
4883        let client = server.client_builder().build().await;
4884
4885        server.mock_get_device().error500().expect(1).mount().await;
4886
4887        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4888    }
4889
4890    #[async_test]
4891    async fn test_fetching_well_known_with_homeserver_url() {
4892        let server = MatrixMockServer::new().await;
4893        let client = server.client_builder().build().await;
4894        server.mock_well_known().ok().mount().await;
4895
4896        assert_matches!(client.fetch_client_well_known().await, Some(_));
4897    }
4898
4899    #[async_test]
4900    async fn test_fetching_well_known_with_server_name() {
4901        let server = MatrixMockServer::new().await;
4902        let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4903
4904        server.mock_well_known().ok().mount().await;
4905
4906        let client = MockClientBuilder::new(None)
4907            .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4908            .build()
4909            .await;
4910
4911        assert_matches!(client.fetch_client_well_known().await, Some(_));
4912    }
4913
4914    #[async_test]
4915    async fn test_fetching_well_known_with_domain_part_of_user_id() {
4916        let server = MatrixMockServer::new().await;
4917        server.mock_well_known().ok().mount().await;
4918
4919        let user_id =
4920            UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4921        let client = MockClientBuilder::new(None)
4922            .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4923            .build()
4924            .await;
4925
4926        assert_matches!(client.fetch_client_well_known().await, Some(_));
4927    }
4928}