Skip to main content

matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::{StreamExt, join};
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32    DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35    BaseClient, DmRoomDefinition, RoomInfoNotableUpdate, RoomState, RoomStateFilter,
36    SendOutsideWasm, SessionMeta, StateStoreDataKey, StateStoreDataValue, StoreError,
37    SyncOutsideWasm, ThreadingSupport,
38    event_cache::store::EventCacheStoreLock,
39    media::store::MediaStoreLock,
40    store::{DynStateStore, RoomLoadSettings, SupportedVersionsResponse, WellKnownResponse},
41    sync::{Notification, RoomUpdates},
42    task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl::TtlValue};
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50    api::{
51        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52        client::{
53            account::whoami,
54            alias::{create_alias, delete_alias, get_alias},
55            authenticated_media,
56            device::{self, delete_devices, get_devices, update_device},
57            directory::{get_public_rooms, get_public_rooms_filtered},
58            discovery::{
59                discover_homeserver::{self, RtcFocusInfo},
60                get_supported_versions,
61            },
62            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
63            knock::knock_room,
64            media,
65            membership::{join_room_by_id, join_room_by_id_or_alias},
66            room::create_room,
67            session::login::v3::DiscoveryInfo,
68            sync::sync_events,
69            threads::get_thread_subscriptions_changes,
70            uiaa,
71            user_directory::search_users,
72        },
73        error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
74        path_builder::PathBuilder,
75    },
76    assign,
77    events::{beacon_info::OriginalSyncBeaconInfoEvent, direct::DirectUserIdentifier},
78    push::Ruleset,
79    time::Instant,
80};
81use serde::de::DeserializeOwned;
82use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
83use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
84use url::Url;
85
86use self::{
87    caches::{Cache, CachedValue, ClientCaches},
88    futures::SendRequest,
89};
90use crate::{
91    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
92    Room, SessionTokens, TransmissionProgress,
93    authentication::{
94        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
95        oauth::OAuth,
96    },
97    client::{
98        homeserver_capabilities::HomeserverCapabilities,
99        thread_subscriptions::ThreadSubscriptionCatchup,
100    },
101    config::{RequestConfig, SyncToken},
102    deduplicating_handler::DeduplicatingHandler,
103    error::HttpResult,
104    event_cache::EventCache,
105    event_handler::{
106        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
107        EventHandlerStore, ObservableEventHandler, SyncEvent,
108    },
109    http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
110    latest_events::LatestEvents,
111    live_locations_observer::BeaconInfoUpdate,
112    media::MediaError,
113    notification_settings::NotificationSettings,
114    room::RoomMember,
115    room_preview::RoomPreview,
116    send_queue::{SendQueue, SendQueueData},
117    sliding_sync::Version as SlidingSyncVersion,
118    sync::{RoomUpdate, SyncResponse},
119};
120#[cfg(feature = "e2e-encryption")]
121use crate::{
122    cross_process_lock::CrossProcessLock,
123    encryption::{
124        DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
125        VerificationState,
126    },
127};
128
129mod builder;
130pub(crate) mod caches;
131pub(crate) mod futures;
132pub(crate) mod homeserver_capabilities;
133pub(crate) mod thread_subscriptions;
134
135pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
136#[cfg(feature = "experimental-search")]
137use crate::search_index::SearchIndex;
138
139#[cfg(not(target_family = "wasm"))]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
141#[cfg(target_family = "wasm")]
142type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
143
144#[cfg(not(target_family = "wasm"))]
145type NotificationHandlerFn =
146    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
147#[cfg(target_family = "wasm")]
148type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
149
150/// Enum controlling if a loop running callbacks should continue or abort.
151///
152/// This is mainly used in the [`sync_with_callback`] method, the return value
153/// of the provided callback controls if the sync loop should be exited.
154///
155/// [`sync_with_callback`]: #method.sync_with_callback
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum LoopCtrl {
158    /// Continue running the loop.
159    Continue,
160    /// Break out of the loop.
161    Break,
162}
163
164/// Represents changes that can occur to a `Client`s `Session`.
165#[derive(Debug, Clone, PartialEq)]
166pub enum SessionChange {
167    /// The session's token is no longer valid.
168    UnknownToken(UnknownTokenErrorData),
169    /// The session's tokens have been refreshed.
170    TokensRefreshed,
171}
172
173/// Information about the server vendor obtained from the federation API.
174#[derive(Debug, Clone, PartialEq, Eq)]
175#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
176pub struct ServerVendorInfo {
177    /// The server name.
178    pub server_name: String,
179    /// The server version.
180    pub version: String,
181}
182
183/// An async/await enabled Matrix client.
184///
185/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
186#[derive(Clone)]
187pub struct Client {
188    pub(crate) inner: Arc<ClientInner>,
189}
190
191#[derive(Default)]
192pub(crate) struct ClientLocks {
193    /// Lock ensuring that only a single room may be marked as a DM at once.
194    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
195    /// explanation.
196    pub(crate) mark_as_dm_lock: Mutex<()>,
197
198    /// Lock ensuring that only a single secret store is getting opened at the
199    /// same time.
200    ///
201    /// This is important so we don't accidentally create multiple different new
202    /// default secret storage keys.
203    #[cfg(feature = "e2e-encryption")]
204    pub(crate) open_secret_store_lock: Mutex<()>,
205
206    /// Lock ensuring that we're only storing a single secret at a time.
207    ///
208    /// Take a look at the [`SecretStore::put_secret`] method for a more
209    /// detailed explanation.
210    ///
211    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
212    #[cfg(feature = "e2e-encryption")]
213    pub(crate) store_secret_lock: Mutex<()>,
214
215    /// Lock ensuring that only one method at a time might modify our backup.
216    #[cfg(feature = "e2e-encryption")]
217    pub(crate) backup_modify_lock: Mutex<()>,
218
219    /// Lock ensuring that we're going to attempt to upload backups for a single
220    /// requester.
221    #[cfg(feature = "e2e-encryption")]
222    pub(crate) backup_upload_lock: Mutex<()>,
223
224    /// Handler making sure we only have one group session sharing request in
225    /// flight per room.
226    #[cfg(feature = "e2e-encryption")]
227    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
228
229    /// Lock making sure we're only doing one key claim request at a time.
230    #[cfg(feature = "e2e-encryption")]
231    pub(crate) key_claim_lock: Mutex<()>,
232
233    /// Handler to ensure that only one members request is running at a time,
234    /// given a room.
235    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
236
237    /// Handler to ensure that only one encryption state request is running at a
238    /// time, given a room.
239    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
240
241    /// Deduplicating handler for sending read receipts. The string is an
242    /// internal implementation detail, see [`Self::send_single_receipt`].
243    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
244
245    #[cfg(feature = "e2e-encryption")]
246    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
247
248    /// Latest "generation" of data known by the crypto store.
249    ///
250    /// This is a counter that only increments, set in the database (and can
251    /// wrap). It's incremented whenever some process acquires a lock for the
252    /// first time. *This assumes the crypto store lock is being held, to
253    /// avoid data races on writing to this value in the store*.
254    ///
255    /// The current process will maintain this value in local memory and in the
256    /// DB over time. Observing a different value than the one read in
257    /// memory, when reading from the store indicates that somebody else has
258    /// written into the database under our feet.
259    ///
260    /// TODO: this should live in the `OlmMachine`, since it's information
261    /// related to the lock. As of today (2023-07-28), we blow up the entire
262    /// olm machine when there's a generation mismatch. So storing the
263    /// generation in the olm machine would make the client think there's
264    /// *always* a mismatch, and that's why we need to store the generation
265    /// outside the `OlmMachine`.
266    #[cfg(feature = "e2e-encryption")]
267    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
268}
269
270pub(crate) struct ClientInner {
271    /// All the data related to authentication and authorization.
272    pub(crate) auth_ctx: Arc<AuthCtx>,
273
274    /// The URL of the server.
275    ///
276    /// Not to be confused with the `Self::homeserver`. `server` is usually
277    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
278    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
279    /// homeserver (at the time of writing — 2024-08-28).
280    ///
281    /// This value is optional depending on how the `Client` has been built.
282    /// If it's been built from a homeserver URL directly, we don't know the
283    /// server. However, if the `Client` has been built from a server URL or
284    /// name, then the homeserver has been discovered, and we know both.
285    server: Option<Url>,
286
287    /// The URL of the homeserver to connect to.
288    ///
289    /// This is the URL for the client-server Matrix API.
290    homeserver: StdRwLock<Url>,
291
292    /// The sliding sync version.
293    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
294
295    /// The underlying HTTP client.
296    pub(crate) http_client: HttpClient,
297
298    /// User session data.
299    pub(super) base_client: BaseClient,
300
301    /// Collection of in-memory caches for the [`Client`].
302    pub(crate) caches: ClientCaches,
303
304    /// Collection of locks individual client methods might want to use, either
305    /// to ensure that only a single call to a method happens at once or to
306    /// deduplicate multiple calls to a method.
307    pub(crate) locks: ClientLocks,
308
309    /// The cross-process lock configuration.
310    ///
311    /// The SDK provides cross-process store locks (see
312    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
313    /// [`CrossProcessLockConfig::MultiProcess`] is used.
314    ///
315    /// If multiple `Client`s are running in different processes, this
316    /// value MUST be different for each `Client`.
317    cross_process_lock_config: CrossProcessLockConfig,
318
319    /// A mapping of the times at which the current user sent typing notices,
320    /// keyed by room.
321    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
322
323    /// Event handlers. See `add_event_handler`.
324    pub(crate) event_handlers: EventHandlerStore,
325
326    /// Notification handlers. See `register_notification_handler`.
327    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
328
329    /// The sender-side of channels used to receive room updates.
330    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
331
332    /// The sender-side of a channel used to observe all the room updates of a
333    /// sync response.
334    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
335
336    /// Whether the client should update its homeserver URL with the discovery
337    /// information present in the login response.
338    respect_login_well_known: bool,
339
340    /// An event that can be listened on to wait for a successful sync. The
341    /// event will only be fired if a sync loop is running. Can be used for
342    /// synchronization, e.g. if we send out a request to create a room, we can
343    /// wait for the sync to get the data to fetch a room object from the state
344    /// store.
345    pub(crate) sync_beat: event_listener::Event,
346
347    /// A central cache for events, inactive first.
348    ///
349    /// It becomes active when [`EventCache::subscribe`] is called.
350    pub(crate) event_cache: OnceCell<EventCache>,
351
352    /// End-to-end encryption related state.
353    #[cfg(feature = "e2e-encryption")]
354    pub(crate) e2ee: EncryptionData,
355
356    /// The verification state of our own device.
357    #[cfg(feature = "e2e-encryption")]
358    pub(crate) verification_state: SharedObservable<VerificationState>,
359
360    /// Whether to enable the experimental support for sending and receiving
361    /// encrypted room history on invite, per [MSC4268].
362    ///
363    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
364    #[cfg(feature = "e2e-encryption")]
365    pub(crate) enable_share_history_on_invite: bool,
366
367    /// Data related to the [`SendQueue`].
368    ///
369    /// [`SendQueue`]: crate::send_queue::SendQueue
370    pub(crate) send_queue_data: Arc<SendQueueData>,
371
372    /// The `max_upload_size` value of the homeserver, it contains the max
373    /// request size you can send.
374    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
375
376    /// The entry point to get the [`LatestEvent`] of rooms and threads.
377    ///
378    /// [`LatestEvent`]: crate::latest_event::LatestEvent
379    latest_events: OnceCell<LatestEvents>,
380
381    /// Service handling the catching up of thread subscriptions in the
382    /// background.
383    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
384
385    #[cfg(feature = "experimental-search")]
386    /// Handler for [`RoomIndex`]'s of each room
387    search_index: SearchIndex,
388
389    /// A monitor for background tasks spawned by the client.
390    pub(crate) task_monitor: TaskMonitor,
391
392    /// A sender to notify subscribers about duplicate key upload errors
393    /// triggered by requests to /keys/upload.
394    #[cfg(feature = "e2e-encryption")]
395    pub(crate) duplicate_key_upload_error_sender:
396        broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
397}
398
399impl ClientInner {
400    /// Create a new `ClientInner`.
401    ///
402    /// All the fields passed as parameters here are those that must be cloned
403    /// upon instantiation of a sub-client, e.g. a client specialized for
404    /// notifications.
405    #[allow(clippy::too_many_arguments)]
406    async fn new(
407        auth_ctx: Arc<AuthCtx>,
408        server: Option<Url>,
409        homeserver: Url,
410        sliding_sync_version: SlidingSyncVersion,
411        http_client: HttpClient,
412        base_client: BaseClient,
413        supported_versions: CachedValue<TtlValue<SupportedVersions>>,
414        well_known: CachedValue<TtlValue<Option<WellKnownResponse>>>,
415        respect_login_well_known: bool,
416        event_cache: OnceCell<EventCache>,
417        send_queue: Arc<SendQueueData>,
418        latest_events: OnceCell<LatestEvents>,
419        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
420        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
421        cross_process_lock_config: CrossProcessLockConfig,
422        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
423        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
424    ) -> Arc<Self> {
425        let caches = ClientCaches {
426            supported_versions: Cache::with_value(supported_versions),
427            well_known: Cache::with_value(well_known),
428            server_metadata: Cache::new(),
429            homeserver_capabilities: Cache::new(),
430        };
431
432        let client = Self {
433            server,
434            homeserver: StdRwLock::new(homeserver),
435            auth_ctx,
436            sliding_sync_version: StdRwLock::new(sliding_sync_version),
437            http_client,
438            base_client,
439            caches,
440            locks: Default::default(),
441            cross_process_lock_config,
442            typing_notice_times: Default::default(),
443            event_handlers: Default::default(),
444            notification_handlers: Default::default(),
445            room_update_channels: Default::default(),
446            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
447            // ballast for all observers to catch up.
448            room_updates_sender: broadcast::Sender::new(32),
449            respect_login_well_known,
450            sync_beat: event_listener::Event::new(),
451            event_cache,
452            send_queue_data: send_queue,
453            latest_events,
454            #[cfg(feature = "e2e-encryption")]
455            e2ee: EncryptionData::new(encryption_settings),
456            #[cfg(feature = "e2e-encryption")]
457            verification_state: SharedObservable::new(VerificationState::Unknown),
458            #[cfg(feature = "e2e-encryption")]
459            enable_share_history_on_invite,
460            server_max_upload_size: Mutex::new(OnceCell::new()),
461            #[cfg(feature = "experimental-search")]
462            search_index: search_index_handler,
463            thread_subscription_catchup,
464            task_monitor: TaskMonitor::new(),
465            #[cfg(feature = "e2e-encryption")]
466            duplicate_key_upload_error_sender: broadcast::channel(1).0,
467        };
468
469        #[allow(clippy::let_and_return)]
470        let client = Arc::new(client);
471
472        #[cfg(feature = "e2e-encryption")]
473        client.e2ee.initialize_tasks(&client);
474
475        let init_event_cache = client.event_cache.get_or_init(|| async {
476            EventCache::new(&client, client.base_client.event_cache_store().clone())
477        });
478
479        let init_thread_subscription_catchup = client
480            .thread_subscription_catchup
481            .get_or_init(|| ThreadSubscriptionCatchup::new(Client { inner: client.clone() }));
482
483        let _ = join!(init_event_cache, init_thread_subscription_catchup);
484
485        client
486    }
487}
488
489#[cfg(not(tarpaulin_include))]
490impl Debug for Client {
491    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
492        write!(fmt, "Client")
493    }
494}
495
496impl Client {
497    /// Create a new [`Client`] that will use the given homeserver.
498    ///
499    /// # Arguments
500    ///
501    /// * `homeserver_url` - The homeserver that the client should connect to.
502    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
503        Self::builder().homeserver_url(homeserver_url).build().await
504    }
505
506    /// Returns a subscriber that publishes an event every time the ignore user
507    /// list changes.
508    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
509        self.inner.base_client.subscribe_to_ignore_user_list_changes()
510    }
511
512    /// Create a new [`ClientBuilder`].
513    pub fn builder() -> ClientBuilder {
514        ClientBuilder::new()
515    }
516
517    pub(crate) fn base_client(&self) -> &BaseClient {
518        &self.inner.base_client
519    }
520
521    /// The underlying HTTP client.
522    pub fn http_client(&self) -> &reqwest::Client {
523        &self.inner.http_client.inner
524    }
525
526    pub(crate) fn locks(&self) -> &ClientLocks {
527        &self.inner.locks
528    }
529
530    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
531        &self.inner.auth_ctx
532    }
533
534    /// The cross-process store lock configuration used by this [`Client`].
535    ///
536    /// The SDK provides cross-process store locks (see
537    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
538    /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
539    /// the value used for all cross-process store locks used by this
540    /// `Client`.
541    pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
542        &self.inner.cross_process_lock_config
543    }
544
545    /// Change the homeserver URL used by this client.
546    ///
547    /// # Arguments
548    ///
549    /// * `homeserver_url` - The new URL to use.
550    fn set_homeserver(&self, homeserver_url: Url) {
551        *self.inner.homeserver.write().unwrap() = homeserver_url;
552    }
553
554    /// Change to a different homeserver and re-resolve well-known.
555    #[cfg(feature = "e2e-encryption")]
556    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
557        &self,
558        homeserver_url: Url,
559    ) -> Result<()> {
560        self.set_homeserver(homeserver_url);
561        self.reset_well_known().await?;
562        if let Some(well_known) = self.well_known().await {
563            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
564        }
565        Ok(())
566    }
567
568    /// Retrieves a helper component to access the [`HomeserverCapabilities`]
569    /// supported or disabled by the homeserver.
570    pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
571        HomeserverCapabilities::new(self.clone())
572    }
573
574    /// Get the server vendor information from the federation API.
575    ///
576    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
577    /// both the server's software name and version.
578    ///
579    /// # Examples
580    ///
581    /// ```no_run
582    /// # use matrix_sdk::Client;
583    /// # use url::Url;
584    /// # async {
585    /// # let homeserver = Url::parse("http://example.com")?;
586    /// let client = Client::new(homeserver).await?;
587    ///
588    /// let server_info = client.server_vendor_info(None).await?;
589    /// println!(
590    ///     "Server: {}, Version: {}",
591    ///     server_info.server_name, server_info.version
592    /// );
593    /// # anyhow::Ok(()) };
594    /// ```
595    #[cfg(feature = "federation-api")]
596    pub async fn server_vendor_info(
597        &self,
598        request_config: Option<RequestConfig>,
599    ) -> HttpResult<ServerVendorInfo> {
600        use ruma::api::federation::discovery::get_server_version;
601
602        let res = self
603            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
604            .await?;
605
606        // Extract server info, using defaults if fields are missing.
607        let server = res.server.unwrap_or_default();
608        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
609        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
610
611        Ok(ServerVendorInfo { server_name: server_name_str, version })
612    }
613
614    /// Get a copy of the default request config.
615    ///
616    /// The default request config is what's used when sending requests if no
617    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
618    /// function with such a parameter.
619    ///
620    /// If the default request config was not customized through
621    /// [`ClientBuilder`] when creating this `Client`, the returned value will
622    /// be equivalent to [`RequestConfig::default()`].
623    pub fn request_config(&self) -> RequestConfig {
624        self.inner.http_client.request_config
625    }
626
627    /// Check whether the client has been activated.
628    ///
629    /// A client is considered active when:
630    ///
631    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
632    ///    is logged in,
633    /// 2. Has loaded cached data from storage,
634    /// 3. If encryption is enabled, it also initialized or restored its
635    ///    `OlmMachine`.
636    pub fn is_active(&self) -> bool {
637        self.inner.base_client.is_active()
638    }
639
640    /// The server used by the client.
641    ///
642    /// See `Self::server` to learn more.
643    pub fn server(&self) -> Option<&Url> {
644        self.inner.server.as_ref()
645    }
646
647    /// The homeserver of the client.
648    pub fn homeserver(&self) -> Url {
649        self.inner.homeserver.read().unwrap().clone()
650    }
651
652    /// Get the sliding sync version.
653    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
654        self.inner.sliding_sync_version.read().unwrap().clone()
655    }
656
657    /// Override the sliding sync version.
658    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
659        let mut lock = self.inner.sliding_sync_version.write().unwrap();
660        *lock = version;
661    }
662
663    /// Get the Matrix user session meta information.
664    ///
665    /// If the client is currently logged in, this will return a
666    /// [`SessionMeta`] object which contains the user ID and device ID.
667    /// Otherwise it returns `None`.
668    pub fn session_meta(&self) -> Option<&SessionMeta> {
669        self.base_client().session_meta()
670    }
671
672    /// Returns a receiver that gets events for each room info update. To watch
673    /// for new events, use `receiver.resubscribe()`.
674    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
675        self.base_client().room_info_notable_update_receiver()
676    }
677
678    /// Performs a search for users.
679    /// The search is performed case-insensitively on user IDs and display names
680    ///
681    /// # Arguments
682    ///
683    /// * `search_term` - The search term for the search
684    /// * `limit` - The maximum number of results to return. Defaults to 10.
685    ///
686    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
687    pub async fn search_users(
688        &self,
689        search_term: &str,
690        limit: u64,
691    ) -> HttpResult<search_users::v3::Response> {
692        let mut request = search_users::v3::Request::new(search_term.to_owned());
693
694        if let Some(limit) = UInt::new(limit) {
695            request.limit = limit;
696        }
697
698        self.send(request).await
699    }
700
701    /// Get the user id of the current owner of the client.
702    pub fn user_id(&self) -> Option<&UserId> {
703        self.session_meta().map(|s| s.user_id.as_ref())
704    }
705
706    /// Get the device ID that identifies the current session.
707    pub fn device_id(&self) -> Option<&DeviceId> {
708        self.session_meta().map(|s| s.device_id.as_ref())
709    }
710
711    /// Get the current access token for this session.
712    ///
713    /// Will be `None` if the client has not been logged in.
714    pub fn access_token(&self) -> Option<String> {
715        self.auth_ctx().access_token()
716    }
717
718    /// Get the current tokens for this session.
719    ///
720    /// To be notified of changes in the session tokens, use
721    /// [`Client::subscribe_to_session_changes()`] or
722    /// [`Client::set_session_callbacks()`].
723    ///
724    /// Returns `None` if the client has not been logged in.
725    pub fn session_tokens(&self) -> Option<SessionTokens> {
726        self.auth_ctx().session_tokens()
727    }
728
729    /// Access the authentication API used to log in this client.
730    ///
731    /// Will be `None` if the client has not been logged in.
732    pub fn auth_api(&self) -> Option<AuthApi> {
733        match self.auth_ctx().auth_data.get()? {
734            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
735            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
736        }
737    }
738
739    /// Get the whole session info of this client.
740    ///
741    /// Will be `None` if the client has not been logged in.
742    ///
743    /// Can be used with [`Client::restore_session`] to restore a previously
744    /// logged-in session.
745    pub fn session(&self) -> Option<AuthSession> {
746        match self.auth_api()? {
747            AuthApi::Matrix(api) => api.session().map(Into::into),
748            AuthApi::OAuth(api) => api.full_session().map(Into::into),
749        }
750    }
751
752    /// Get a reference to the state store.
753    pub fn state_store(&self) -> &DynStateStore {
754        self.base_client().state_store()
755    }
756
757    /// Get a reference to the event cache store.
758    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
759        self.base_client().event_cache_store()
760    }
761
762    /// Get a reference to the media store.
763    pub fn media_store(&self) -> &MediaStoreLock {
764        self.base_client().media_store()
765    }
766
767    /// Access the native Matrix authentication API with this client.
768    pub fn matrix_auth(&self) -> MatrixAuth {
769        MatrixAuth::new(self.clone())
770    }
771
772    /// Get the account of the current owner of the client.
773    pub fn account(&self) -> Account {
774        Account::new(self.clone())
775    }
776
777    /// Get the encryption manager of the client.
778    #[cfg(feature = "e2e-encryption")]
779    pub fn encryption(&self) -> Encryption {
780        Encryption::new(self.clone())
781    }
782
783    /// Get the media manager of the client.
784    pub fn media(&self) -> Media {
785        Media::new(self.clone())
786    }
787
788    /// Get the pusher manager of the client.
789    pub fn pusher(&self) -> Pusher {
790        Pusher::new(self.clone())
791    }
792
793    /// Access the OAuth 2.0 API of the client.
794    pub fn oauth(&self) -> OAuth {
795        OAuth::new(self.clone())
796    }
797
798    /// Register a handler for a specific event type.
799    ///
800    /// The handler is a function or closure with one or more arguments. The
801    /// first argument is the event itself. All additional arguments are
802    /// "context" arguments: They have to implement [`EventHandlerContext`].
803    /// This trait is named that way because most of the types implementing it
804    /// give additional context about an event: The room it was in, its raw form
805    /// and other similar things. As two exceptions to this,
806    /// [`Client`] and [`EventHandlerHandle`] also implement the
807    /// `EventHandlerContext` trait so you don't have to clone your client
808    /// into the event handler manually and a handler can decide to remove
809    /// itself.
810    ///
811    /// Some context arguments are not universally applicable. A context
812    /// argument that isn't available for the given event type will result in
813    /// the event handler being skipped and an error being logged. The following
814    /// context argument types are only available for a subset of event types:
815    ///
816    /// * [`Room`] is only available for room-specific events, i.e. not for
817    ///   events like global account data events or presence events.
818    ///
819    /// You can provide custom context via
820    /// [`add_event_handler_context`](Client::add_event_handler_context) and
821    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
822    /// into the event handler.
823    ///
824    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
825    ///
826    /// # Examples
827    ///
828    /// ```no_run
829    /// use matrix_sdk::{
830    ///     deserialized_responses::EncryptionInfo,
831    ///     event_handler::Ctx,
832    ///     ruma::{
833    ///         events::{
834    ///             macros::EventContent,
835    ///             push_rules::PushRulesEvent,
836    ///             room::{
837    ///                 message::SyncRoomMessageEvent,
838    ///                 topic::SyncRoomTopicEvent,
839    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
840    ///             },
841    ///         },
842    ///         push::Action,
843    ///         Int, MilliSecondsSinceUnixEpoch,
844    ///     },
845    ///     Client, Room,
846    /// };
847    /// use serde::{Deserialize, Serialize};
848    ///
849    /// # async fn example(client: Client) {
850    /// client.add_event_handler(
851    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
852    ///         // Common usage: Room event plus room and client.
853    ///     },
854    /// );
855    /// client.add_event_handler(
856    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
857    ///         async move {
858    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
859    ///             // unencrypted events and events that were decrypted by the SDK.
860    ///         }
861    ///     },
862    /// );
863    /// client.add_event_handler(
864    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
865    ///         async move {
866    ///             // A `Vec<Action>` parameter allows you to know which push actions
867    ///             // are applicable for an event. For example, an event with
868    ///             // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
869    ///             // should be highlighted in the timeline.
870    ///         }
871    ///     },
872    /// );
873    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
874    ///     // You can omit any or all arguments after the first.
875    /// });
876    ///
877    /// // Registering a temporary event handler:
878    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
879    ///     /* Event handler */
880    /// });
881    /// client.remove_event_handler(handle);
882    ///
883    /// // Registering custom event handler context:
884    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
885    /// struct MyContext {
886    ///     number: usize,
887    /// }
888    /// client.add_event_handler_context(MyContext { number: 5 });
889    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
890    ///     // Use the context
891    /// });
892    ///
893    /// // This will handle membership events in joined rooms. Invites are special, see below.
894    /// client.add_event_handler(
895    ///     |ev: SyncRoomMemberEvent| async move {},
896    /// );
897    ///
898    /// // To handle state events in invited rooms (including invite membership events),
899    /// // `StrippedRoomMemberEvent` should be used.
900    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
901    /// client.add_event_handler(
902    ///     |ev: StrippedRoomMemberEvent| async move {},
903    /// );
904    ///
905    /// // Custom events work exactly the same way, you just need to declare
906    /// // the content struct and use the EventContent derive macro on it.
907    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
908    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
909    /// struct TokenEventContent {
910    ///     token: String,
911    ///     #[serde(rename = "exp")]
912    ///     expires_at: MilliSecondsSinceUnixEpoch,
913    /// }
914    ///
915    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
916    ///     todo!("Display the token");
917    /// });
918    ///
919    /// // Event handler closures can also capture local variables.
920    /// // Make sure they are cheap to clone though, because they will be cloned
921    /// // every time the closure is called.
922    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
923    ///
924    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
925    ///     println!("Calling the handler with identifier {data}");
926    /// });
927    /// # }
928    /// ```
929    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
930    where
931        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
932        H: EventHandler<Ev, Ctx>,
933    {
934        self.add_event_handler_impl(handler, None)
935    }
936
937    /// Register a handler for a specific room, and event type.
938    ///
939    /// This method works the same way as
940    /// [`add_event_handler`][Self::add_event_handler], except that the handler
941    /// will only be called for events in the room with the specified ID. See
942    /// that method for more details on event handler functions.
943    ///
944    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
945    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
946    /// your use case.
947    pub fn add_room_event_handler<Ev, Ctx, H>(
948        &self,
949        room_id: &RoomId,
950        handler: H,
951    ) -> EventHandlerHandle
952    where
953        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
954        H: EventHandler<Ev, Ctx>,
955    {
956        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
957    }
958
959    /// Observe a specific event type.
960    ///
961    /// `Ev` represents the kind of event that will be observed. `Ctx`
962    /// represents the context that will come with the event. It relies on the
963    /// same mechanism as [`Client::add_event_handler`]. The main difference is
964    /// that it returns an [`ObservableEventHandler`] and doesn't require a
965    /// user-defined closure. It is possible to subscribe to the
966    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
967    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
968    /// Ctx)`.
969    ///
970    /// Be careful that only the most recent value can be observed. Subscribers
971    /// are notified when a new value is sent, but there is no guarantee
972    /// that they will see all values.
973    ///
974    /// # Example
975    ///
976    /// Let's see a classical usage:
977    ///
978    /// ```
979    /// use futures_util::StreamExt as _;
980    /// use matrix_sdk::{
981    ///     Client, Room,
982    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
983    /// };
984    ///
985    /// # async fn example(client: Client) -> Option<()> {
986    /// let observer =
987    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
988    ///
989    /// let mut subscriber = observer.subscribe();
990    ///
991    /// let (event, (room, push_actions)) = subscriber.next().await?;
992    /// # Some(())
993    /// # }
994    /// ```
995    ///
996    /// Now let's see how to get several contexts that can be useful for you:
997    ///
998    /// ```
999    /// use matrix_sdk::{
1000    ///     Client, Room,
1001    ///     deserialized_responses::EncryptionInfo,
1002    ///     ruma::{
1003    ///         events::room::{
1004    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1005    ///         },
1006    ///         push::Action,
1007    ///     },
1008    /// };
1009    ///
1010    /// # async fn example(client: Client) {
1011    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1012    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1013    ///
1014    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1015    /// // to distinguish between unencrypted events and events that were decrypted
1016    /// // by the SDK.
1017    /// let _ = client
1018    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1019    ///     );
1020    ///
1021    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1022    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1023    /// // should be highlighted in the timeline.
1024    /// let _ =
1025    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1026    ///
1027    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1028    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1029    /// # }
1030    /// ```
1031    ///
1032    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1033    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1034    where
1035        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1036        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1037    {
1038        self.observe_room_events_impl(None)
1039    }
1040
1041    /// Observe a specific room, and event type.
1042    ///
1043    /// This method works the same way as [`Client::observe_events`], except
1044    /// that the observability will only be applied for events in the room with
1045    /// the specified ID. See that method for more details.
1046    ///
1047    /// Be careful that only the most recent value can be observed. Subscribers
1048    /// are notified when a new value is sent, but there is no guarantee
1049    /// that they will see all values.
1050    pub fn observe_room_events<Ev, Ctx>(
1051        &self,
1052        room_id: &RoomId,
1053    ) -> ObservableEventHandler<(Ev, Ctx)>
1054    where
1055        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1056        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1057    {
1058        self.observe_room_events_impl(Some(room_id.to_owned()))
1059    }
1060
1061    /// Shared implementation for `Client::observe_events` and
1062    /// `Client::observe_room_events`.
1063    fn observe_room_events_impl<Ev, Ctx>(
1064        &self,
1065        room_id: Option<OwnedRoomId>,
1066    ) -> ObservableEventHandler<(Ev, Ctx)>
1067    where
1068        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1069        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1070    {
1071        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1072        // new value.
1073        let shared_observable = SharedObservable::new(None);
1074
1075        ObservableEventHandler::new(
1076            shared_observable.clone(),
1077            self.event_handler_drop_guard(self.add_event_handler_impl(
1078                move |event: Ev, context: Ctx| {
1079                    shared_observable.set(Some((event, context)));
1080
1081                    ready(())
1082                },
1083                room_id,
1084            )),
1085        )
1086    }
1087
1088    /// Subscribe to future `beacon_info` updates for the current user across
1089    /// all rooms.
1090    ///
1091    /// This stream is push-only: it emits only future updates observed during
1092    /// sync processing and does not replay existing state.
1093    pub fn observe_own_beacon_info_updates(
1094        &self,
1095    ) -> Result<impl Stream<Item = BeaconInfoUpdate> + use<>> {
1096        let observer = self.observe_events::<OriginalSyncBeaconInfoEvent, Room>();
1097        let mut stream = observer.subscribe();
1098        let own_user_id = self.user_id().ok_or(Error::AuthenticationRequired)?.to_owned();
1099        Ok(async_stream::stream! {
1100            let _observer = observer;
1101
1102            while let Some((event, room)) = stream.next().await {
1103                if event.state_key != own_user_id {
1104                    continue;
1105                }
1106                yield BeaconInfoUpdate {
1107                    room_id: room.room_id().to_owned(),
1108                    event_id: event.event_id,
1109                    content: event.content,
1110                };
1111            }
1112        })
1113    }
1114
1115    /// Remove the event handler associated with the handle.
1116    ///
1117    /// Note that you **must not** call `remove_event_handler` from the
1118    /// non-async part of an event handler, that is:
1119    ///
1120    /// ```ignore
1121    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1122    ///     // ⚠ this will cause a deadlock ⚠
1123    ///     client.remove_event_handler(handle);
1124    ///
1125    ///     async move {
1126    ///         // removing the event handler here is fine
1127    ///         client.remove_event_handler(handle);
1128    ///     }
1129    /// })
1130    /// ```
1131    ///
1132    /// Note also that handlers that remove themselves will still execute with
1133    /// events received in the same sync cycle.
1134    ///
1135    /// # Arguments
1136    ///
1137    /// `handle` - The [`EventHandlerHandle`] that is returned when
1138    /// registering the event handler with [`Client::add_event_handler`].
1139    ///
1140    /// # Examples
1141    ///
1142    /// ```no_run
1143    /// # use url::Url;
1144    /// # use tokio::sync::mpsc;
1145    /// #
1146    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1147    /// #
1148    /// use matrix_sdk::{
1149    ///     Client, event_handler::EventHandlerHandle,
1150    ///     ruma::events::room::member::SyncRoomMemberEvent,
1151    /// };
1152    /// #
1153    /// # futures_executor::block_on(async {
1154    /// # let client = matrix_sdk::Client::builder()
1155    /// #     .homeserver_url(homeserver)
1156    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1157    /// #     .build()
1158    /// #     .await
1159    /// #     .unwrap();
1160    ///
1161    /// client.add_event_handler(
1162    ///     |ev: SyncRoomMemberEvent,
1163    ///      client: Client,
1164    ///      handle: EventHandlerHandle| async move {
1165    ///         // Common usage: Check arriving Event is the expected one
1166    ///         println!("Expected RoomMemberEvent received!");
1167    ///         client.remove_event_handler(handle);
1168    ///     },
1169    /// );
1170    /// # });
1171    /// ```
1172    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1173        self.inner.event_handlers.remove(handle);
1174    }
1175
1176    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1177    /// the given handle.
1178    ///
1179    /// When the returned value is dropped, the event handler will be removed.
1180    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1181        EventHandlerDropGuard::new(handle, self.clone())
1182    }
1183
1184    /// Add an arbitrary value for use as event handler context.
1185    ///
1186    /// The value can be obtained in an event handler by adding an argument of
1187    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1188    ///
1189    /// If a value of the same type has been added before, it will be
1190    /// overwritten.
1191    ///
1192    /// # Examples
1193    ///
1194    /// ```no_run
1195    /// use matrix_sdk::{
1196    ///     Room, event_handler::Ctx,
1197    ///     ruma::events::room::message::SyncRoomMessageEvent,
1198    /// };
1199    /// # #[derive(Clone)]
1200    /// # struct SomeType;
1201    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1202    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1203    /// # futures_executor::block_on(async {
1204    /// # let client = matrix_sdk::Client::builder()
1205    /// #     .homeserver_url(homeserver)
1206    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1207    /// #     .build()
1208    /// #     .await
1209    /// #     .unwrap();
1210    ///
1211    /// // Handle used to send messages to the UI part of the app
1212    /// let my_gui_handle: SomeType = obtain_gui_handle();
1213    ///
1214    /// client.add_event_handler_context(my_gui_handle.clone());
1215    /// client.add_event_handler(
1216    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1217    ///         async move {
1218    ///             // gui_handle.send(DisplayMessage { message: ev });
1219    ///         }
1220    ///     },
1221    /// );
1222    /// # });
1223    /// ```
1224    pub fn add_event_handler_context<T>(&self, ctx: T)
1225    where
1226        T: Clone + Send + Sync + 'static,
1227    {
1228        self.inner.event_handlers.add_context(ctx);
1229    }
1230
1231    /// Register a handler for a notification.
1232    ///
1233    /// Similar to [`Client::add_event_handler`], but only allows functions
1234    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1235    /// [`Client`] for now.
1236    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1237    where
1238        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1239        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1240    {
1241        self.inner.notification_handlers.write().await.push(Box::new(
1242            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1243        ));
1244
1245        self
1246    }
1247
1248    /// Subscribe to all updates for the room with the given ID.
1249    ///
1250    /// The returned receiver will receive a new message for each sync response
1251    /// that contains updates for that room.
1252    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1253        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1254            btree_map::Entry::Vacant(entry) => {
1255                let (tx, rx) = broadcast::channel(8);
1256                entry.insert(tx);
1257                rx
1258            }
1259            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1260        }
1261    }
1262
1263    /// Subscribe to all updates to all rooms, whenever any has been received in
1264    /// a sync response.
1265    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1266        self.inner.room_updates_sender.subscribe()
1267    }
1268
1269    pub(crate) async fn notification_handlers(
1270        &self,
1271    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1272        self.inner.notification_handlers.read().await
1273    }
1274
1275    /// Get all the rooms the client knows about.
1276    ///
1277    /// This will return the list of joined, invited, and left rooms.
1278    pub fn rooms(&self) -> Vec<Room> {
1279        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1280    }
1281
1282    /// Get all the rooms the client knows about, filtered by room state.
1283    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1284        self.base_client()
1285            .rooms_filtered(filter)
1286            .into_iter()
1287            .map(|room| Room::new(self.clone(), room))
1288            .collect()
1289    }
1290
1291    /// Get a stream of all the rooms, in addition to the existing rooms.
1292    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1293        let (rooms, stream) = self.base_client().rooms_stream();
1294
1295        let map_room = |room| Room::new(self.clone(), room);
1296
1297        (
1298            rooms.into_iter().map(map_room).collect(),
1299            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1300        )
1301    }
1302
1303    /// Returns the joined rooms this client knows about.
1304    pub fn joined_rooms(&self) -> Vec<Room> {
1305        self.rooms_filtered(RoomStateFilter::JOINED)
1306    }
1307
1308    /// Returns the invited rooms this client knows about.
1309    pub fn invited_rooms(&self) -> Vec<Room> {
1310        self.rooms_filtered(RoomStateFilter::INVITED)
1311    }
1312
1313    /// Returns the left rooms this client knows about.
1314    pub fn left_rooms(&self) -> Vec<Room> {
1315        self.rooms_filtered(RoomStateFilter::LEFT)
1316    }
1317
1318    /// Returns the joined space rooms this client knows about.
1319    pub fn joined_space_rooms(&self) -> Vec<Room> {
1320        self.base_client()
1321            .rooms_filtered(RoomStateFilter::JOINED)
1322            .into_iter()
1323            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1324            .collect()
1325    }
1326
1327    /// Get a room with the given room id.
1328    ///
1329    /// # Arguments
1330    ///
1331    /// `room_id` - The unique id of the room that should be fetched.
1332    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1333        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1334    }
1335
1336    /// Gets the preview of a room, whether the current user has joined it or
1337    /// not.
1338    pub async fn get_room_preview(
1339        &self,
1340        room_or_alias_id: &RoomOrAliasId,
1341        via: Vec<OwnedServerName>,
1342    ) -> Result<RoomPreview> {
1343        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1344            Ok(room_id) => room_id.to_owned(),
1345            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1346        };
1347
1348        if let Some(room) = self.get_room(&room_id) {
1349            // The cached data can only be trusted if the room state is joined or
1350            // banned: for invite and knock rooms, no updates will be received
1351            // for the rooms after the invite/knock action took place so we may
1352            // have very out to date data for important fields such as
1353            // `join_rule`. For left rooms, the homeserver should return the latest info.
1354            match room.state() {
1355                RoomState::Joined | RoomState::Banned => {
1356                    return Ok(RoomPreview::from_known_room(&room).await);
1357                }
1358                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1359            }
1360        }
1361
1362        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1363    }
1364
1365    /// Resolve a room alias to a room id and a list of servers which know
1366    /// about it.
1367    ///
1368    /// # Arguments
1369    ///
1370    /// `room_alias` - The room alias to be resolved.
1371    pub async fn resolve_room_alias(
1372        &self,
1373        room_alias: &RoomAliasId,
1374    ) -> HttpResult<get_alias::v3::Response> {
1375        let request = get_alias::v3::Request::new(room_alias.to_owned());
1376        self.send(request).await
1377    }
1378
1379    /// Checks if a room alias is not in use yet.
1380    ///
1381    /// Returns:
1382    /// - `Ok(true)` if the room alias is available.
1383    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1384    ///   status code).
1385    /// - An `Err` otherwise.
1386    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1387        match self.resolve_room_alias(alias).await {
1388            // The room alias was resolved, so it's already in use.
1389            Ok(_) => Ok(false),
1390            Err(error) => {
1391                match error.client_api_error_kind() {
1392                    // The room alias wasn't found, so it's available.
1393                    Some(ErrorKind::NotFound) => Ok(true),
1394                    _ => Err(error),
1395                }
1396            }
1397        }
1398    }
1399
1400    /// Adds a new room alias associated with a room to the room directory.
1401    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1402        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1403        self.send(request).await?;
1404        Ok(())
1405    }
1406
1407    /// Removes a room alias from the room directory.
1408    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1409        let request = delete_alias::v3::Request::new(alias.to_owned());
1410        self.send(request).await?;
1411        Ok(())
1412    }
1413
1414    /// Update the homeserver from the login response well-known if needed.
1415    ///
1416    /// # Arguments
1417    ///
1418    /// * `login_well_known` - The `well_known` field from a successful login
1419    ///   response.
1420    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1421        if self.inner.respect_login_well_known
1422            && let Some(well_known) = login_well_known
1423            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1424        {
1425            self.set_homeserver(homeserver);
1426        }
1427    }
1428
1429    /// Similar to [`Client::restore_session_with`], with
1430    /// [`RoomLoadSettings::default()`].
1431    ///
1432    /// # Panics
1433    ///
1434    /// Panics if a session was already restored or logged in.
1435    #[instrument(skip_all)]
1436    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1437        self.restore_session_with(session, RoomLoadSettings::default()).await
1438    }
1439
1440    /// Restore a session previously logged-in using one of the available
1441    /// authentication APIs. The number of rooms to restore is controlled by
1442    /// [`RoomLoadSettings`].
1443    ///
1444    /// See the documentation of the corresponding authentication API's
1445    /// `restore_session` method for more information.
1446    ///
1447    /// # Panics
1448    ///
1449    /// Panics if a session was already restored or logged in.
1450    #[instrument(skip_all)]
1451    pub async fn restore_session_with(
1452        &self,
1453        session: impl Into<AuthSession>,
1454        room_load_settings: RoomLoadSettings,
1455    ) -> Result<()> {
1456        let session = session.into();
1457        match session {
1458            AuthSession::Matrix(session) => {
1459                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1460            }
1461            AuthSession::OAuth(session) => {
1462                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1463            }
1464        }
1465    }
1466
1467    /// Refresh the access token using the authentication API used to log into
1468    /// this session.
1469    ///
1470    /// See the documentation of the authentication API's `refresh_access_token`
1471    /// method for more information.
1472    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1473        let Some(auth_api) = self.auth_api() else {
1474            return Err(RefreshTokenError::RefreshTokenRequired);
1475        };
1476
1477        match auth_api {
1478            AuthApi::Matrix(api) => {
1479                trace!("Token refresh: Using the homeserver.");
1480                Box::pin(api.refresh_access_token()).await?;
1481            }
1482            AuthApi::OAuth(api) => {
1483                trace!("Token refresh: Using OAuth 2.0.");
1484                Box::pin(api.refresh_access_token()).await?;
1485            }
1486        }
1487
1488        Ok(())
1489    }
1490
1491    /// Log out the current session using the proper authentication API.
1492    ///
1493    /// # Errors
1494    ///
1495    /// Returns an error if the session is not authenticated or if an error
1496    /// occurred while making the request to the server.
1497    pub async fn logout(&self) -> Result<(), Error> {
1498        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1499        match auth_api {
1500            AuthApi::Matrix(matrix_auth) => {
1501                matrix_auth.logout().await?;
1502                Ok(())
1503            }
1504            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1505        }
1506    }
1507
1508    /// Get or upload a sync filter.
1509    ///
1510    /// This method will either get a filter ID from the store or upload the
1511    /// filter definition to the homeserver and return the new filter ID.
1512    ///
1513    /// # Arguments
1514    ///
1515    /// * `filter_name` - The unique name of the filter, this name will be used
1516    /// locally to store and identify the filter ID returned by the server.
1517    ///
1518    /// * `definition` - The filter definition that should be uploaded to the
1519    /// server if no filter ID can be found in the store.
1520    ///
1521    /// # Examples
1522    ///
1523    /// ```no_run
1524    /// # use matrix_sdk::{
1525    /// #    Client, config::SyncSettings,
1526    /// #    ruma::api::client::{
1527    /// #        filter::{
1528    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1529    /// #        },
1530    /// #        sync::sync_events::v3::Filter,
1531    /// #    }
1532    /// # };
1533    /// # use url::Url;
1534    /// # async {
1535    /// # let homeserver = Url::parse("http://example.com").unwrap();
1536    /// # let client = Client::new(homeserver).await.unwrap();
1537    /// let mut filter = FilterDefinition::default();
1538    ///
1539    /// // Let's enable member lazy loading.
1540    /// filter.room.state.lazy_load_options =
1541    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1542    ///
1543    /// let filter_id = client
1544    ///     .get_or_upload_filter("sync", filter)
1545    ///     .await
1546    ///     .unwrap();
1547    ///
1548    /// let sync_settings = SyncSettings::new()
1549    ///     .filter(Filter::FilterId(filter_id));
1550    ///
1551    /// let response = client.sync_once(sync_settings).await.unwrap();
1552    /// # };
1553    #[instrument(skip(self, definition))]
1554    pub async fn get_or_upload_filter(
1555        &self,
1556        filter_name: &str,
1557        definition: FilterDefinition,
1558    ) -> Result<String> {
1559        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1560            debug!("Found filter locally");
1561            Ok(filter)
1562        } else {
1563            debug!("Didn't find filter locally");
1564            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1565            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1566            let response = self.send(request).await?;
1567
1568            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1569
1570            Ok(response.filter_id)
1571        }
1572    }
1573
1574    /// Prepare to join a room by ID, by getting the current details about it
1575    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1576        let room = self.get_room(room_id)?;
1577
1578        let inviter = match room.invite_details().await {
1579            Ok(details) => details.inviter,
1580            Err(Error::WrongRoomState(_)) => None,
1581            Err(e) => {
1582                warn!("Error fetching invite details for room: {e:?}");
1583                None
1584            }
1585        };
1586
1587        Some(PreJoinRoomInfo { inviter })
1588    }
1589
1590    /// Finish joining a room.
1591    ///
1592    /// If the room was an invite that should be marked as a DM, will include it
1593    /// in the DM event after creating the joined room.
1594    ///
1595    /// If encrypted history sharing is enabled, will check to see if we have a
1596    /// key bundle, and import it if so.
1597    ///
1598    /// # Arguments
1599    ///
1600    /// * `room_id` - The `RoomId` of the room that was joined.
1601    /// * `pre_join_room_info` - Information about the room before we joined.
1602    async fn finish_join_room(
1603        &self,
1604        room_id: &RoomId,
1605        pre_join_room_info: Option<PreJoinRoomInfo>,
1606    ) -> Result<Room> {
1607        info!(?room_id, ?pre_join_room_info, "Completing room join");
1608        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1609            room.state() == RoomState::Invited
1610                && room.is_direct().await.unwrap_or_else(|e| {
1611                    warn!(%room_id, "is_direct() failed: {e}");
1612                    false
1613                })
1614        } else {
1615            false
1616        };
1617
1618        let base_room = self
1619            .base_client()
1620            .room_joined(
1621                room_id,
1622                pre_join_room_info
1623                    .as_ref()
1624                    .and_then(|info| info.inviter.as_ref())
1625                    .map(|i| i.user_id().to_owned()),
1626            )
1627            .await?;
1628        let room = Room::new(self.clone(), base_room);
1629
1630        if mark_as_dm {
1631            room.set_is_direct(true).await?;
1632        }
1633
1634        // If we joined following an invite, check if we had previously received a key
1635        // bundle from the inviter, and import it if so.
1636        //
1637        // It's important that we only do this once `BaseClient::room_joined` has
1638        // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1639        // race.
1640        #[cfg(feature = "e2e-encryption")]
1641        if self.inner.enable_share_history_on_invite
1642            && let Some(inviter) =
1643                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1644        {
1645            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1646                .await?;
1647        }
1648
1649        // Suppress "unused variable" and "unused field" lints
1650        #[cfg(not(feature = "e2e-encryption"))]
1651        let _ = pre_join_room_info.map(|i| i.inviter);
1652
1653        Ok(room)
1654    }
1655
1656    /// Join a room by `RoomId`.
1657    ///
1658    /// Returns the `Room` in the joined state.
1659    ///
1660    /// # Arguments
1661    ///
1662    /// * `room_id` - The `RoomId` of the room to be joined.
1663    #[instrument(skip(self))]
1664    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1665        // See who invited us to this room, if anyone. Note we have to do this before
1666        // making the `/join` request, otherwise we could race against the sync.
1667        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1668
1669        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1670        let response = self.send(request).await?;
1671        self.finish_join_room(&response.room_id, pre_join_info).await
1672    }
1673
1674    /// Join a room by `RoomOrAliasId`.
1675    ///
1676    /// Returns the `Room` in the joined state.
1677    ///
1678    /// # Arguments
1679    ///
1680    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1681    ///   alias looks like `#name:example.com`.
1682    /// * `server_names` - The server names to be used for resolving the alias,
1683    ///   if needs be.
1684    #[instrument(skip(self))]
1685    pub async fn join_room_by_id_or_alias(
1686        &self,
1687        alias: &RoomOrAliasId,
1688        server_names: &[OwnedServerName],
1689    ) -> Result<Room> {
1690        let pre_join_info = {
1691            match alias.try_into() {
1692                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1693                Err(_) => {
1694                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1695                    // responding to an invitation to the room, and therefore don't need to handle
1696                    // things that happen as a result of invites.
1697                    None
1698                }
1699            }
1700        };
1701        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1702            via: server_names.to_owned(),
1703        });
1704        let response = self.send(request).await?;
1705        self.finish_join_room(&response.room_id, pre_join_info).await
1706    }
1707
1708    /// Search the homeserver's directory of public rooms.
1709    ///
1710    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1711    /// a `get_public_rooms::Response`.
1712    ///
1713    /// # Arguments
1714    ///
1715    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1716    ///
1717    /// * `since` - Pagination token from a previous request.
1718    ///
1719    /// * `server` - The name of the server, if `None` the requested server is
1720    ///   used.
1721    ///
1722    /// # Examples
1723    /// ```no_run
1724    /// use matrix_sdk::Client;
1725    /// # use url::Url;
1726    /// # let homeserver = Url::parse("http://example.com").unwrap();
1727    /// # let limit = Some(10);
1728    /// # let since = Some("since token");
1729    /// # let server = Some("servername.com".try_into().unwrap());
1730    /// # async {
1731    /// let mut client = Client::new(homeserver).await.unwrap();
1732    ///
1733    /// client.public_rooms(limit, since, server).await;
1734    /// # };
1735    /// ```
1736    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1737    pub async fn public_rooms(
1738        &self,
1739        limit: Option<u32>,
1740        since: Option<&str>,
1741        server: Option<&ServerName>,
1742    ) -> HttpResult<get_public_rooms::v3::Response> {
1743        let limit = limit.map(UInt::from);
1744
1745        let request = assign!(get_public_rooms::v3::Request::new(), {
1746            limit,
1747            since: since.map(ToOwned::to_owned),
1748            server: server.map(ToOwned::to_owned),
1749        });
1750        self.send(request).await
1751    }
1752
1753    /// Create a room with the given parameters.
1754    ///
1755    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1756    /// created room.
1757    ///
1758    /// If you want to create a direct message with one specific user, you can
1759    /// use [`create_dm`][Self::create_dm], which is more convenient than
1760    /// assembling the [`create_room::v3::Request`] yourself.
1761    ///
1762    /// If the `is_direct` field of the request is set to `true` and at least
1763    /// one user is invited, the room will be automatically added to the direct
1764    /// rooms in the account data.
1765    ///
1766    /// # Examples
1767    ///
1768    /// ```no_run
1769    /// use matrix_sdk::{
1770    ///     Client,
1771    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1772    /// };
1773    /// # use url::Url;
1774    /// #
1775    /// # async {
1776    /// # let homeserver = Url::parse("http://example.com").unwrap();
1777    /// let request = CreateRoomRequest::new();
1778    /// let client = Client::new(homeserver).await.unwrap();
1779    /// assert!(client.create_room(request).await.is_ok());
1780    /// # };
1781    /// ```
1782    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1783        let invite = request.invite.clone();
1784        let is_direct_room = request.is_direct;
1785        let response = self.send(request).await?;
1786
1787        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1788
1789        let joined_room = Room::new(self.clone(), base_room);
1790
1791        if is_direct_room
1792            && !invite.is_empty()
1793            && let Err(error) =
1794                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1795        {
1796            // FIXME: Retry in the background
1797            error!("Failed to mark room as DM: {error}");
1798        }
1799
1800        Ok(joined_room)
1801    }
1802
1803    /// Create a DM room.
1804    ///
1805    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1806    /// given user being invited, the room marked `is_direct` and both the
1807    /// creator and invitee getting the default maximum power level.
1808    ///
1809    /// If the `e2e-encryption` feature is enabled, the room will also be
1810    /// encrypted.
1811    ///
1812    /// # Arguments
1813    ///
1814    /// * `user_id` - The ID of the user to create a DM for.
1815    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1816        #[cfg(feature = "e2e-encryption")]
1817        let initial_state = vec![
1818            InitialStateEvent::with_empty_state_key(
1819                RoomEncryptionEventContent::with_recommended_defaults(),
1820            )
1821            .to_raw_any(),
1822        ];
1823
1824        #[cfg(not(feature = "e2e-encryption"))]
1825        let initial_state = vec![];
1826
1827        let request = assign!(create_room::v3::Request::new(), {
1828            invite: vec![user_id.to_owned()],
1829            is_direct: true,
1830            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1831            initial_state,
1832        });
1833
1834        self.create_room(request).await
1835    }
1836
1837    /// Get the first existing DM room with the given user, if any.
1838    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1839        self.get_dm_rooms(user_id).next()
1840    }
1841
1842    /// Get an iterator with the existing DM rooms for the given user.
1843    pub fn get_dm_rooms(&self, user_id: &UserId) -> impl Iterator<Item = Room> {
1844        let rooms = self.joined_rooms();
1845
1846        let dm_definition = &self.base_client().dm_room_definition;
1847
1848        // Find the room we share with the `user_id` and only with `user_id`
1849        let rooms = rooms.into_iter().filter(move |r| {
1850            let targets = r.direct_targets();
1851            let targets_match =
1852                targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id));
1853            match dm_definition {
1854                DmRoomDefinition::MatrixSpec => targets_match,
1855                DmRoomDefinition::TwoMembers => {
1856                    let service_members_count =
1857                        r.service_members().map(|s| s.len()).unwrap_or_default() as u64;
1858                    let active_non_service_members =
1859                        r.active_members_count().saturating_sub(service_members_count);
1860                    targets_match && active_non_service_members <= 2
1861                }
1862            }
1863        });
1864
1865        trace!(?user_id, ?rooms, "Found DM rooms with user");
1866        rooms
1867    }
1868
1869    /// Search the homeserver's directory for public rooms with a filter.
1870    ///
1871    /// # Arguments
1872    ///
1873    /// * `room_search` - The easiest way to create this request is using the
1874    ///   `get_public_rooms_filtered::Request` itself.
1875    ///
1876    /// # Examples
1877    ///
1878    /// ```no_run
1879    /// # use url::Url;
1880    /// # use matrix_sdk::Client;
1881    /// # async {
1882    /// # let homeserver = Url::parse("http://example.com")?;
1883    /// use matrix_sdk::ruma::{
1884    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1885    /// };
1886    /// # let mut client = Client::new(homeserver).await?;
1887    ///
1888    /// let mut filter = Filter::new();
1889    /// filter.generic_search_term = Some("rust".to_owned());
1890    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1891    /// request.filter = filter;
1892    ///
1893    /// let response = client.public_rooms_filtered(request).await?;
1894    ///
1895    /// for room in response.chunk {
1896    ///     println!("Found room {room:?}");
1897    /// }
1898    /// # anyhow::Ok(()) };
1899    /// ```
1900    pub async fn public_rooms_filtered(
1901        &self,
1902        request: get_public_rooms_filtered::v3::Request,
1903    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1904        self.send(request).await
1905    }
1906
1907    /// Send an arbitrary request to the server, without updating client state.
1908    ///
1909    /// **Warning:** Because this method *does not* update the client state, it
1910    /// is important to make sure that you account for this yourself, and
1911    /// use wrapper methods where available.  This method should *only* be
1912    /// used if a wrapper method for the endpoint you'd like to use is not
1913    /// available.
1914    ///
1915    /// # Arguments
1916    ///
1917    /// * `request` - A filled out and valid request for the endpoint to be hit
1918    ///
1919    /// * `timeout` - An optional request timeout setting, this overrides the
1920    ///   default request setting if one was set.
1921    ///
1922    /// # Examples
1923    ///
1924    /// ```no_run
1925    /// # use matrix_sdk::{Client, config::SyncSettings};
1926    /// # use url::Url;
1927    /// # async {
1928    /// # let homeserver = Url::parse("http://localhost:8080")?;
1929    /// # let mut client = Client::new(homeserver).await?;
1930    /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1931    ///
1932    /// // First construct the request you want to make
1933    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1934    /// // for all available Endpoints
1935    /// let user_id = owned_user_id!("@example:localhost");
1936    /// let request = profile::get_profile::v3::Request::new(user_id);
1937    ///
1938    /// // Start the request using Client::send()
1939    /// let response = client.send(request).await?;
1940    ///
1941    /// // Check the corresponding Response struct to find out what types are
1942    /// // returned
1943    /// # anyhow::Ok(()) };
1944    /// ```
1945    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1946    where
1947        Request: OutgoingRequest + Clone + Debug,
1948        Request::Authentication: SupportedAuthScheme,
1949        Request::PathBuilder: SupportedPathBuilder,
1950        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1951        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1952    {
1953        SendRequest {
1954            client: self.clone(),
1955            request,
1956            config: None,
1957            send_progress: Default::default(),
1958        }
1959    }
1960
1961    pub(crate) async fn send_inner<Request>(
1962        &self,
1963        request: Request,
1964        config: Option<RequestConfig>,
1965        send_progress: SharedObservable<TransmissionProgress>,
1966    ) -> HttpResult<Request::IncomingResponse>
1967    where
1968        Request: OutgoingRequest + Debug,
1969        Request::Authentication: SupportedAuthScheme,
1970        Request::PathBuilder: SupportedPathBuilder,
1971        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1972        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1973    {
1974        let homeserver = self.homeserver().to_string();
1975        let access_token = self.access_token();
1976        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1977
1978        let path_builder_input =
1979            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1980
1981        let result = self
1982            .inner
1983            .http_client
1984            .send(
1985                request,
1986                config,
1987                homeserver,
1988                access_token.as_deref(),
1989                path_builder_input,
1990                send_progress,
1991            )
1992            .await;
1993
1994        if let Err(Some(ErrorKind::UnknownToken { .. })) =
1995            result.as_ref().map_err(HttpError::client_api_error_kind)
1996            && let Some(access_token) = &access_token
1997        {
1998            // Mark the access token as expired.
1999            self.auth_ctx().set_access_token_expired(access_token);
2000        }
2001
2002        result
2003    }
2004
2005    fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
2006        _ = self
2007            .inner
2008            .auth_ctx
2009            .session_change_sender
2010            .send(SessionChange::UnknownToken(unknown_token_data.clone()));
2011    }
2012
2013    /// Fetches server versions from network; no caching.
2014    pub async fn fetch_server_versions(
2015        &self,
2016        request_config: Option<RequestConfig>,
2017    ) -> HttpResult<get_supported_versions::Response> {
2018        // Since this was called by the user, try to refresh the access token if
2019        // necessary.
2020        self.fetch_server_versions_inner(false, request_config).await
2021    }
2022
2023    /// Fetches server versions from network; no caching.
2024    ///
2025    /// If the access token is expired and `failsafe` is `false`, this will
2026    /// attempt to refresh the access token, otherwise this will try to make an
2027    /// unauthenticated request instead.
2028    pub(crate) async fn fetch_server_versions_inner(
2029        &self,
2030        failsafe: bool,
2031        request_config: Option<RequestConfig>,
2032    ) -> HttpResult<get_supported_versions::Response> {
2033        if !failsafe {
2034            // `Client::send()` handles refreshing access tokens.
2035            return self
2036                .send(get_supported_versions::Request::new())
2037                .with_request_config(request_config)
2038                .await;
2039        }
2040
2041        let homeserver = self.homeserver().to_string();
2042
2043        // If we have a fresh access token, try with it first.
2044        if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2045            && self.auth_ctx().has_valid_access_token()
2046            && let Some(access_token) = self.access_token()
2047        {
2048            let result = self
2049                .inner
2050                .http_client
2051                .send(
2052                    get_supported_versions::Request::new(),
2053                    request_config,
2054                    homeserver.clone(),
2055                    Some(&access_token),
2056                    (),
2057                    Default::default(),
2058                )
2059                .await;
2060
2061            if let Err(Some(ErrorKind::UnknownToken { .. })) =
2062                result.as_ref().map_err(HttpError::client_api_error_kind)
2063            {
2064                // If the access token is actually expired, mark it as expired and fallback to
2065                // the unauthenticated request below.
2066                self.auth_ctx().set_access_token_expired(&access_token);
2067            } else {
2068                // If the request succeeded or it's an other error, just stop now.
2069                return result;
2070            }
2071        }
2072
2073        // Try without authentication.
2074        self.inner
2075            .http_client
2076            .send(
2077                get_supported_versions::Request::new(),
2078                request_config,
2079                homeserver.clone(),
2080                None,
2081                (),
2082                Default::default(),
2083            )
2084            .await
2085    }
2086
2087    /// Fetches client well_known from network; no caching.
2088    ///
2089    /// 1. If the [`Client::server`] value is available, we use it to fetch the
2090    ///    well-known contents.
2091    /// 2. If it's not, we try extracting the server name from the
2092    ///    [`Client::user_id`] and building the server URL from it.
2093    /// 3. If we couldn't get the well-known contents with either the explicit
2094    ///    server name or the implicit extracted one, we try the homeserver URL
2095    ///    as a last resort.
2096    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2097        let homeserver = self.homeserver();
2098        let scheme = homeserver.scheme();
2099
2100        // Use the server name, either an explicit one or an implicit one taken from
2101        // the user id: sometimes we'll have only the homeserver url available and no
2102        // server name, but the server name can be extracted from the current user id.
2103        let server_url = self
2104            .server()
2105            .map(|server| server.to_string())
2106            // If the server name wasn't available, extract it from the user id and build a URL:
2107            // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2108            // will be the same for the public server url, lacking a better candidate.
2109            .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2110
2111        // If the server name is available, first try using it
2112        let response = if let Some(server_url) = server_url {
2113            // First try using the server name
2114            self.fetch_client_well_known_with_url(server_url).await
2115        } else {
2116            None
2117        };
2118
2119        // If we didn't get a well-known value yet, try with the homeserver url instead:
2120        if response.is_none() {
2121            // Sometimes people configure their well-known directly on the homeserver so use
2122            // this as a fallback when the server name is unknown.
2123            warn!(
2124                "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2125            );
2126            self.fetch_client_well_known_with_url(homeserver.to_string()).await
2127        } else {
2128            response
2129        }
2130    }
2131
2132    async fn fetch_client_well_known_with_url(
2133        &self,
2134        url: String,
2135    ) -> Option<discover_homeserver::Response> {
2136        let well_known = self
2137            .inner
2138            .http_client
2139            .send(
2140                discover_homeserver::Request::new(),
2141                Some(RequestConfig::short_retry()),
2142                url,
2143                None,
2144                (),
2145                Default::default(),
2146            )
2147            .await;
2148
2149        match well_known {
2150            Ok(well_known) => Some(well_known),
2151            Err(http_error) => {
2152                // It is perfectly valid to not have a well-known file.
2153                // Maybe we should check for a specific error code to be sure?
2154                warn!("Failed to fetch client well-known: {http_error}");
2155                None
2156            }
2157        }
2158    }
2159
2160    /// Load supported versions from storage, or fetch them from network and
2161    /// cache them.
2162    ///
2163    /// If `failsafe` is true, this will try to minimize side effects to avoid
2164    /// possible deadlocks.
2165    async fn fetch_supported_versions(
2166        &self,
2167        failsafe: bool,
2168    ) -> HttpResult<SupportedVersionsResponse> {
2169        let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2170        let supported_versions = SupportedVersionsResponse {
2171            versions: server_versions.versions,
2172            unstable_features: server_versions.unstable_features,
2173        };
2174
2175        Ok(supported_versions)
2176    }
2177
2178    /// Get the Matrix versions and features supported by the homeserver by
2179    /// fetching them from the server or the cache.
2180    ///
2181    /// This is equivalent to calling both [`Client::server_versions()`] and
2182    /// [`Client::unstable_features()`]. To always fetch the result from the
2183    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2184    /// and then `.as_supported_versions()` on the response.
2185    ///
2186    /// # Examples
2187    ///
2188    /// ```no_run
2189    /// use ruma::api::{FeatureFlag, MatrixVersion};
2190    /// # use matrix_sdk::{Client, config::SyncSettings};
2191    /// # use url::Url;
2192    /// # async {
2193    /// # let homeserver = Url::parse("http://localhost:8080")?;
2194    /// # let mut client = Client::new(homeserver).await?;
2195    ///
2196    /// let supported = client.supported_versions().await?;
2197    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2198    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2199    ///
2200    /// let msc_x_feature = FeatureFlag::from("msc_x");
2201    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2202    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2203    /// # anyhow::Ok(()) };
2204    /// ```
2205    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2206        self.supported_versions_inner(false).await
2207    }
2208
2209    /// Get the Matrix versions and features supported by the homeserver by
2210    /// fetching them from the server or the cache.
2211    ///
2212    /// If `failsafe` is true, this will try to minimize side effects to avoid
2213    /// possible deadlocks.
2214    pub(crate) async fn supported_versions_inner(
2215        &self,
2216        failsafe: bool,
2217    ) -> HttpResult<SupportedVersions> {
2218        match self.supported_versions_cached_inner(failsafe).await {
2219            Ok(Some(value)) => {
2220                return Ok(value);
2221            }
2222            Ok(None) => {
2223                // The cache is empty, make a request.
2224            }
2225            Err(error) => {
2226                warn!("error when loading cached supported versions: {error}");
2227                // Fallthrough to make a request.
2228            }
2229        }
2230
2231        self.refresh_supported_versions_cache(failsafe).await
2232    }
2233
2234    /// Refresh the Matrix versions and features supported by the homeserver in
2235    /// the cache.
2236    ///
2237    /// If `failsafe` is true, this will try to minimize side effects to avoid
2238    /// possible deadlocks.
2239    async fn refresh_supported_versions_cache(
2240        &self,
2241        failsafe: bool,
2242    ) -> HttpResult<SupportedVersions> {
2243        let cached_supported_versions = &self.inner.caches.supported_versions;
2244
2245        let mut supported_versions_guard = match cached_supported_versions.refresh_lock.try_lock() {
2246            Ok(guard) => guard,
2247            Err(_) => {
2248                // There is already a refresh in progress, wait for it to finish.
2249                let guard = cached_supported_versions.refresh_lock.lock().await;
2250
2251                if let Err(error) = guard.as_ref() {
2252                    // There was an error in the previous refresh, return it.
2253                    return Err(HttpError::Cached(error.clone()));
2254                }
2255
2256                // Reuse the data if it was cached and it hasn't expired.
2257                if let CachedValue::Cached(value) = cached_supported_versions.value()
2258                    && !value.has_expired()
2259                {
2260                    return Ok(value.into_data());
2261                }
2262
2263                // The data wasn't cached or has expired, we need to make another request.
2264                guard
2265            }
2266        };
2267
2268        let response = match self.fetch_supported_versions(failsafe).await {
2269            Ok(response) => {
2270                *supported_versions_guard = Ok(());
2271                TtlValue::new(response)
2272            }
2273            Err(error) => {
2274                let error = Arc::new(error);
2275                *supported_versions_guard = Err(error.clone());
2276                return Err(HttpError::Cached(error));
2277            }
2278        };
2279
2280        let supported_versions = response.as_ref().map(|response| response.supported_versions());
2281
2282        // Only cache the result if the request was authenticated.
2283        if self.auth_ctx().has_valid_access_token() {
2284            if let Err(err) = self
2285                .state_store()
2286                .set_kv_data(
2287                    StateStoreDataKey::SupportedVersions,
2288                    StateStoreDataValue::SupportedVersions(response),
2289                )
2290                .await
2291            {
2292                warn!("error when caching supported versions: {err}");
2293            }
2294
2295            cached_supported_versions.set_value(supported_versions.clone());
2296        }
2297
2298        Ok(supported_versions.into_data())
2299    }
2300
2301    /// Get the Matrix versions and features supported by the homeserver by
2302    /// fetching them from the cache.
2303    ///
2304    /// For a version of this function that fetches the supported versions and
2305    /// features from the homeserver if the [`SupportedVersions`] aren't
2306    /// found in the cache, take a look at the [`Client::supported_versions()`]
2307    /// method.
2308    ///
2309    /// If the data in the cache has expired, this will trigger a background
2310    /// task to refresh it.
2311    ///
2312    /// # Examples
2313    ///
2314    /// ```no_run
2315    /// use ruma::api::{FeatureFlag, MatrixVersion};
2316    /// # use matrix_sdk::{Client, config::SyncSettings};
2317    /// # use url::Url;
2318    /// # async {
2319    /// # let homeserver = Url::parse("http://localhost:8080")?;
2320    /// # let mut client = Client::new(homeserver).await?;
2321    ///
2322    /// let supported =
2323    ///     if let Some(supported) = client.supported_versions_cached().await? {
2324    ///         supported
2325    ///     } else {
2326    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2327    ///     };
2328    ///
2329    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2330    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2331    ///
2332    /// let msc_x_feature = FeatureFlag::from("msc_x");
2333    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2334    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2335    /// # anyhow::Ok(()) };
2336    /// ```
2337    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2338        self.supported_versions_cached_inner(false).await
2339    }
2340
2341    async fn supported_versions_cached_inner(
2342        &self,
2343        failsafe: bool,
2344    ) -> Result<Option<SupportedVersions>, StoreError> {
2345        let supported_versions_cache = &self.inner.caches.supported_versions;
2346
2347        let value = if let CachedValue::Cached(cached) = supported_versions_cache.value() {
2348            cached
2349        } else if let Some(stored) = self
2350            .state_store()
2351            .get_kv_data(StateStoreDataKey::SupportedVersions)
2352            .await?
2353            .and_then(|value| value.into_supported_versions())
2354        {
2355            let stored = stored.map(|response| response.supported_versions());
2356
2357            // Copy the data from the store in the in-memory cache.
2358            supported_versions_cache.set_value(stored.clone());
2359
2360            stored
2361        } else {
2362            return Ok(None);
2363        };
2364
2365        // Spawn a task to refresh the cache if it has expired and we have a valid
2366        // access token.
2367        if value.has_expired() && self.auth_ctx().has_valid_access_token() {
2368            debug!("spawning task to refresh supported versions cache");
2369
2370            let client = self.clone();
2371            self.task_monitor().spawn_finite_task("refresh supported versions cache", async move {
2372                if let Err(error) = client.refresh_supported_versions_cache(failsafe).await {
2373                    warn!("failed to refresh supported versions cache: {error}");
2374                }
2375            });
2376        }
2377
2378        Ok(Some(value.into_data()))
2379    }
2380
2381    /// Get the Matrix versions supported by the homeserver by fetching them
2382    /// from the server or the cache.
2383    ///
2384    /// # Examples
2385    ///
2386    /// ```no_run
2387    /// use ruma::api::MatrixVersion;
2388    /// # use matrix_sdk::{Client, config::SyncSettings};
2389    /// # use url::Url;
2390    /// # async {
2391    /// # let homeserver = Url::parse("http://localhost:8080")?;
2392    /// # let mut client = Client::new(homeserver).await?;
2393    ///
2394    /// let server_versions = client.server_versions().await?;
2395    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2396    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2397    /// # anyhow::Ok(()) };
2398    /// ```
2399    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2400        Ok(self.supported_versions().await?.versions)
2401    }
2402
2403    /// Get the unstable features supported by the homeserver by fetching them
2404    /// from the server or the cache.
2405    ///
2406    /// # Examples
2407    ///
2408    /// ```no_run
2409    /// use matrix_sdk::ruma::api::FeatureFlag;
2410    /// # use matrix_sdk::{Client, config::SyncSettings};
2411    /// # use url::Url;
2412    /// # async {
2413    /// # let homeserver = Url::parse("http://localhost:8080")?;
2414    /// # let mut client = Client::new(homeserver).await?;
2415    ///
2416    /// let msc_x_feature = FeatureFlag::from("msc_x");
2417    /// let unstable_features = client.unstable_features().await?;
2418    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2419    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2420    /// # anyhow::Ok(()) };
2421    /// ```
2422    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2423        Ok(self.supported_versions().await?.features)
2424    }
2425
2426    /// Empty the supported versions and unstable features cache.
2427    ///
2428    /// Since the SDK caches the supported versions, it's possible to have a
2429    /// stale entry in the cache. This functions makes it possible to force
2430    /// reset it.
2431    pub async fn reset_supported_versions(&self) -> Result<()> {
2432        // Empty the in-memory cache.
2433        self.inner.caches.supported_versions.reset();
2434
2435        // Empty the store cache.
2436        Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2437    }
2438
2439    /// Get the well-known file of the homeserver from the cache.
2440    ///
2441    /// If the data in the cache has expired, this will trigger a background
2442    /// task to refresh it.
2443    async fn well_known_cached(
2444        &self,
2445    ) -> Result<CachedValue<Option<WellKnownResponse>>, StoreError> {
2446        let well_known_cache = &self.inner.caches.well_known;
2447
2448        let value = if let CachedValue::Cached(cached) = well_known_cache.value() {
2449            cached
2450        } else if let Some(stored) = self
2451            .state_store()
2452            .get_kv_data(StateStoreDataKey::WellKnown)
2453            .await?
2454            .and_then(|value| value.into_well_known())
2455        {
2456            // Copy the data from the store into the in-memory cache.
2457            well_known_cache.set_value(stored.clone());
2458
2459            stored
2460        } else {
2461            return Ok(CachedValue::NotSet);
2462        };
2463
2464        // Spawn a task to refresh the cache if it has expired.
2465        if value.has_expired() {
2466            debug!("spawning task to refresh well-known cache");
2467
2468            let client = self.clone();
2469            self.task_monitor().spawn_finite_task("refresh well-known cache", async move {
2470                client.refresh_well_known_cache().await;
2471            });
2472        }
2473
2474        Ok(CachedValue::Cached(value.into_data()))
2475    }
2476
2477    /// Refresh the well-known file of the homeserver in the cache.
2478    async fn refresh_well_known_cache(&self) -> Option<WellKnownResponse> {
2479        let well_known_cache = &self.inner.caches.well_known;
2480
2481        let _well_known_guard = match well_known_cache.refresh_lock.try_lock() {
2482            Ok(guard) => guard,
2483            Err(_) => {
2484                // There is already a refresh in progress, wait for it to finish.
2485                let guard = well_known_cache.refresh_lock.lock().await;
2486
2487                // A refresh can't fail because we ignore failures, so there shouldn't be an
2488                // error in the refresh lock.
2489
2490                // Reuse the data if it was cached and it hasn't expired.
2491                if let CachedValue::Cached(value) = well_known_cache.value()
2492                    && !value.has_expired()
2493                {
2494                    return value.into_data();
2495                }
2496
2497                // The data wasn't cached or has expired, we need to make another request.
2498                guard
2499            }
2500        };
2501
2502        let well_known = TtlValue::new(self.fetch_client_well_known().await.map(Into::into));
2503
2504        if let Err(err) = self
2505            .state_store()
2506            .set_kv_data(
2507                StateStoreDataKey::WellKnown,
2508                StateStoreDataValue::WellKnown(well_known.clone()),
2509            )
2510            .await
2511        {
2512            warn!("error when caching well-known: {err}");
2513        }
2514
2515        well_known_cache.set_value(well_known.clone());
2516
2517        well_known.into_data()
2518    }
2519
2520    /// Get the well-known file of the homeserver by fetching it from the server
2521    /// or the cache.
2522    async fn well_known(&self) -> Option<WellKnownResponse> {
2523        match self.well_known_cached().await {
2524            Ok(CachedValue::Cached(value)) => {
2525                return value;
2526            }
2527            Ok(CachedValue::NotSet) => {
2528                // The cache is empty, make a request.
2529            }
2530            Err(error) => {
2531                warn!("error when loading cached well-known: {error}");
2532                // Fallthrough to make a request.
2533            }
2534        }
2535
2536        self.refresh_well_known_cache().await
2537    }
2538
2539    /// Get information about the homeserver's advertised RTC foci by fetching
2540    /// the well-known file from the server or the cache.
2541    ///
2542    /// # Examples
2543    /// ```no_run
2544    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2545    /// # use url::Url;
2546    /// # async {
2547    /// # let homeserver = Url::parse("http://localhost:8080")?;
2548    /// # let mut client = Client::new(homeserver).await?;
2549    /// let rtc_foci = client.rtc_foci().await?;
2550    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2551    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2552    ///     _ => None,
2553    /// });
2554    /// if let Some(info) = default_livekit_focus_info {
2555    ///     println!("Default LiveKit service URL: {}", info.service_url);
2556    /// }
2557    /// # anyhow::Ok(()) };
2558    /// ```
2559    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2560        let well_known = self.well_known().await;
2561
2562        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2563    }
2564
2565    /// Empty the well-known cache.
2566    ///
2567    /// Since the SDK caches the well-known, it's possible to have a stale entry
2568    /// in the cache. This functions makes it possible to force reset it.
2569    pub async fn reset_well_known(&self) -> Result<()> {
2570        // Empty the in-memory caches.
2571        self.inner.caches.well_known.reset();
2572
2573        // Empty the store cache.
2574        Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2575    }
2576
2577    /// Check whether MSC 4028 is enabled on the homeserver.
2578    ///
2579    /// # Examples
2580    ///
2581    /// ```no_run
2582    /// # use matrix_sdk::{Client, config::SyncSettings};
2583    /// # use url::Url;
2584    /// # async {
2585    /// # let homeserver = Url::parse("http://localhost:8080")?;
2586    /// # let mut client = Client::new(homeserver).await?;
2587    /// let msc4028_enabled =
2588    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2589    /// # anyhow::Ok(()) };
2590    /// ```
2591    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2592        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2593    }
2594
2595    /// Get information of all our own devices.
2596    ///
2597    /// # Examples
2598    ///
2599    /// ```no_run
2600    /// # use matrix_sdk::{Client, config::SyncSettings};
2601    /// # use url::Url;
2602    /// # async {
2603    /// # let homeserver = Url::parse("http://localhost:8080")?;
2604    /// # let mut client = Client::new(homeserver).await?;
2605    /// let response = client.devices().await?;
2606    ///
2607    /// for device in response.devices {
2608    ///     println!(
2609    ///         "Device: {} {}",
2610    ///         device.device_id,
2611    ///         device.display_name.as_deref().unwrap_or("")
2612    ///     );
2613    /// }
2614    /// # anyhow::Ok(()) };
2615    /// ```
2616    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2617        let request = get_devices::v3::Request::new();
2618
2619        self.send(request).await
2620    }
2621
2622    /// Delete the given devices from the server.
2623    ///
2624    /// # Arguments
2625    ///
2626    /// * `devices` - The list of devices that should be deleted from the
2627    ///   server.
2628    ///
2629    /// * `auth_data` - This request requires user interactive auth, the first
2630    ///   request needs to set this to `None` and will always fail with an
2631    ///   `UiaaResponse`. The response will contain information for the
2632    ///   interactive auth and the same request needs to be made but this time
2633    ///   with some `auth_data` provided.
2634    ///
2635    /// ```no_run
2636    /// # use matrix_sdk::{
2637    /// #    ruma::{api::client::uiaa, owned_device_id},
2638    /// #    Client, Error, config::SyncSettings,
2639    /// # };
2640    /// # use serde_json::json;
2641    /// # use url::Url;
2642    /// # use std::collections::BTreeMap;
2643    /// # async {
2644    /// # let homeserver = Url::parse("http://localhost:8080")?;
2645    /// # let mut client = Client::new(homeserver).await?;
2646    /// let devices = &[owned_device_id!("DEVICEID")];
2647    ///
2648    /// if let Err(e) = client.delete_devices(devices, None).await {
2649    ///     if let Some(info) = e.as_uiaa_response() {
2650    ///         let mut password = uiaa::Password::new(
2651    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2652    ///             "wordpass".to_owned(),
2653    ///         );
2654    ///         password.session = info.session.clone();
2655    ///
2656    ///         client
2657    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2658    ///             .await?;
2659    ///     }
2660    /// }
2661    /// # anyhow::Ok(()) };
2662    pub async fn delete_devices(
2663        &self,
2664        devices: &[OwnedDeviceId],
2665        auth_data: Option<uiaa::AuthData>,
2666    ) -> HttpResult<delete_devices::v3::Response> {
2667        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2668        request.auth = auth_data;
2669
2670        self.send(request).await
2671    }
2672
2673    /// Change the display name of a device owned by the current user.
2674    ///
2675    /// Returns a `update_device::Response` which specifies the result
2676    /// of the operation.
2677    ///
2678    /// # Arguments
2679    ///
2680    /// * `device_id` - The ID of the device to change the display name of.
2681    /// * `display_name` - The new display name to set.
2682    pub async fn rename_device(
2683        &self,
2684        device_id: &DeviceId,
2685        display_name: &str,
2686    ) -> HttpResult<update_device::v3::Response> {
2687        let mut request = update_device::v3::Request::new(device_id.to_owned());
2688        request.display_name = Some(display_name.to_owned());
2689
2690        self.send(request).await
2691    }
2692
2693    /// Check whether a device with a specific ID exists on the server.
2694    ///
2695    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2696    /// with 404 and the underlying error otherwise.
2697    ///
2698    /// # Arguments
2699    ///
2700    /// * `device_id` - The ID of the device to query.
2701    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2702        let request = device::get_device::v3::Request::new(device_id);
2703        match self.send(request).await {
2704            Ok(_) => Ok(true),
2705            Err(err) => {
2706                if let Some(error) = err.as_client_api_error()
2707                    && error.status_code == 404
2708                {
2709                    Ok(false)
2710                } else {
2711                    Err(err.into())
2712                }
2713            }
2714        }
2715    }
2716
2717    /// Synchronize the client's state with the latest state on the server.
2718    ///
2719    /// ## Syncing Events
2720    ///
2721    /// Messages or any other type of event need to be periodically fetched from
2722    /// the server, this is achieved by sending a `/sync` request to the server.
2723    ///
2724    /// The first sync is sent out without a [`token`]. The response of the
2725    /// first sync will contain a [`next_batch`] field which should then be
2726    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2727    /// don't receive the same events multiple times.
2728    ///
2729    /// ## Long Polling
2730    ///
2731    /// A sync should in the usual case always be in flight. The
2732    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2733    /// long the server will wait for new events before it will respond.
2734    /// The server will respond immediately if some new events arrive before the
2735    /// timeout has expired. If no changes arrive and the timeout expires an
2736    /// empty sync response will be sent to the client.
2737    ///
2738    /// This method of sending a request that may not receive a response
2739    /// immediately is called long polling.
2740    ///
2741    /// ## Filtering Events
2742    ///
2743    /// The number or type of messages and events that the client should receive
2744    /// from the server can be altered using a [`Filter`].
2745    ///
2746    /// Filters can be non-trivial and, since they will be sent with every sync
2747    /// request, they may take up a bunch of unnecessary bandwidth.
2748    ///
2749    /// Luckily filters can be uploaded to the server and reused using an unique
2750    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2751    /// method.
2752    ///
2753    /// # Arguments
2754    ///
2755    /// * `sync_settings` - Settings for the sync call, this allows us to set
2756    /// various options to configure the sync:
2757    ///     * [`filter`] - To configure which events we receive and which get
2758    ///       [filtered] by the server
2759    ///     * [`timeout`] - To configure our [long polling] setup.
2760    ///     * [`token`] - To tell the server which events we already received
2761    ///       and where we wish to continue syncing.
2762    ///     * [`full_state`] - To tell the server that we wish to receive all
2763    ///       state events, regardless of our configured [`token`].
2764    ///     * [`set_presence`] - To tell the server to set the presence and to
2765    ///       which state.
2766    ///
2767    /// # Examples
2768    ///
2769    /// ```no_run
2770    /// # use url::Url;
2771    /// # async {
2772    /// # let homeserver = Url::parse("http://localhost:8080")?;
2773    /// # let username = "";
2774    /// # let password = "";
2775    /// use matrix_sdk::{
2776    ///     Client, config::SyncSettings,
2777    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2778    /// };
2779    ///
2780    /// let client = Client::new(homeserver).await?;
2781    /// client.matrix_auth().login_username(username, password).send().await?;
2782    ///
2783    /// // Sync once so we receive the client state and old messages.
2784    /// client.sync_once(SyncSettings::default()).await?;
2785    ///
2786    /// // Register our handler so we start responding once we receive a new
2787    /// // event.
2788    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2789    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2790    /// });
2791    ///
2792    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2793    /// // from our `sync_once()` call automatically.
2794    /// client.sync(SyncSettings::default()).await;
2795    /// # anyhow::Ok(()) };
2796    /// ```
2797    ///
2798    /// [`sync`]: #method.sync
2799    /// [`SyncSettings`]: crate::config::SyncSettings
2800    /// [`token`]: crate::config::SyncSettings#method.token
2801    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2802    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2803    /// [`set_presence`]: ruma::presence::PresenceState
2804    /// [`filter`]: crate::config::SyncSettings#method.filter
2805    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2806    /// [`next_batch`]: SyncResponse#structfield.next_batch
2807    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2808    /// [long polling]: #long-polling
2809    /// [filtered]: #filtering-events
2810    #[instrument(skip(self))]
2811    pub async fn sync_once(
2812        &self,
2813        sync_settings: crate::config::SyncSettings,
2814    ) -> Result<SyncResponse> {
2815        // The sync might not return for quite a while due to the timeout.
2816        // We'll see if there's anything crypto related to send out before we
2817        // sync, i.e. if we closed our client after a sync but before the
2818        // crypto requests were sent out.
2819        //
2820        // This will mostly be a no-op.
2821        #[cfg(feature = "e2e-encryption")]
2822        if let Err(e) = self.send_outgoing_requests().await {
2823            error!(error = ?e, "Error while sending outgoing E2EE requests");
2824        }
2825
2826        let token = match sync_settings.token {
2827            SyncToken::Specific(token) => Some(token),
2828            SyncToken::NoToken => None,
2829            SyncToken::ReusePrevious => self.sync_token().await,
2830        };
2831
2832        let request = assign!(sync_events::v3::Request::new(), {
2833            filter: sync_settings.filter.map(|f| *f),
2834            since: token,
2835            full_state: sync_settings.full_state,
2836            set_presence: sync_settings.set_presence,
2837            timeout: sync_settings.timeout,
2838            use_state_after: true,
2839        });
2840        let mut request_config = self.request_config();
2841        if let Some(timeout) = sync_settings.timeout {
2842            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2843            request_config.timeout = Some(base_timeout + timeout);
2844        }
2845
2846        let response = self.send(request).with_request_config(request_config).await?;
2847        let next_batch = response.next_batch.clone();
2848        let response = self.process_sync(response).await?;
2849
2850        #[cfg(feature = "e2e-encryption")]
2851        if let Err(e) = self.send_outgoing_requests().await {
2852            error!(error = ?e, "Error while sending outgoing E2EE requests");
2853        }
2854
2855        self.inner.sync_beat.notify(usize::MAX);
2856
2857        Ok(SyncResponse::new(next_batch, response))
2858    }
2859
2860    /// Repeatedly synchronize the client state with the server.
2861    ///
2862    /// This method will only return on error, if cancellation is needed
2863    /// the method should be wrapped in a cancelable task or the
2864    /// [`Client::sync_with_callback`] method can be used or
2865    /// [`Client::sync_with_result_callback`] if you want to handle error
2866    /// cases in the loop, too.
2867    ///
2868    /// This method will internally call [`Client::sync_once`] in a loop.
2869    ///
2870    /// This method can be used with the [`Client::add_event_handler`]
2871    /// method to react to individual events. If you instead wish to handle
2872    /// events in a bulk manner the [`Client::sync_with_callback`],
2873    /// [`Client::sync_with_result_callback`] and
2874    /// [`Client::sync_stream`] methods can be used instead. Those methods
2875    /// repeatedly return the whole sync response.
2876    ///
2877    /// # Arguments
2878    ///
2879    /// * `sync_settings` - Settings for the sync call. *Note* that those
2880    ///   settings will be only used for the first sync call. See the argument
2881    ///   docs for [`Client::sync_once`] for more info.
2882    ///
2883    /// # Return
2884    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2885    /// up to the user of the API to check the error and decide whether the sync
2886    /// should continue or not.
2887    ///
2888    /// # Examples
2889    ///
2890    /// ```no_run
2891    /// # use url::Url;
2892    /// # async {
2893    /// # let homeserver = Url::parse("http://localhost:8080")?;
2894    /// # let username = "";
2895    /// # let password = "";
2896    /// use matrix_sdk::{
2897    ///     Client, config::SyncSettings,
2898    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2899    /// };
2900    ///
2901    /// let client = Client::new(homeserver).await?;
2902    /// client.matrix_auth().login_username(&username, &password).send().await?;
2903    ///
2904    /// // Register our handler so we start responding once we receive a new
2905    /// // event.
2906    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2907    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2908    /// });
2909    ///
2910    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2911    /// // automatically.
2912    /// client.sync(SyncSettings::default()).await?;
2913    /// # anyhow::Ok(()) };
2914    /// ```
2915    ///
2916    /// [argument docs]: #method.sync_once
2917    /// [`sync_with_callback`]: #method.sync_with_callback
2918    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2919        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2920    }
2921
2922    /// Repeatedly call sync to synchronize the client state with the server.
2923    ///
2924    /// # Arguments
2925    ///
2926    /// * `sync_settings` - Settings for the sync call. *Note* that those
2927    ///   settings will be only used for the first sync call. See the argument
2928    ///   docs for [`Client::sync_once`] for more info.
2929    ///
2930    /// * `callback` - A callback that will be called every time a successful
2931    ///   response has been fetched from the server. The callback must return a
2932    ///   boolean which signalizes if the method should stop syncing. If the
2933    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2934    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2935    ///
2936    /// # Return
2937    /// The sync runs until an error occurs or the
2938    /// callback indicates that the Loop should stop. If the callback asked for
2939    /// a regular stop, the result will be `Ok(())` otherwise the
2940    /// `Err(Error)` is returned.
2941    ///
2942    /// # Examples
2943    ///
2944    /// The following example demonstrates how to sync forever while sending all
2945    /// the interesting events through a mpsc channel to another thread e.g. a
2946    /// UI thread.
2947    ///
2948    /// ```no_run
2949    /// # use std::time::Duration;
2950    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2951    /// # use url::Url;
2952    /// # async {
2953    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2954    /// # let mut client = Client::new(homeserver).await.unwrap();
2955    ///
2956    /// use tokio::sync::mpsc::channel;
2957    ///
2958    /// let (tx, rx) = channel(100);
2959    ///
2960    /// let sync_channel = &tx;
2961    /// let sync_settings = SyncSettings::new()
2962    ///     .timeout(Duration::from_secs(30));
2963    ///
2964    /// client
2965    ///     .sync_with_callback(sync_settings, |response| async move {
2966    ///         let channel = sync_channel;
2967    ///         for (room_id, room) in response.rooms.joined {
2968    ///             for event in room.timeline.events {
2969    ///                 channel.send(event).await.unwrap();
2970    ///             }
2971    ///         }
2972    ///
2973    ///         LoopCtrl::Continue
2974    ///     })
2975    ///     .await;
2976    /// };
2977    /// ```
2978    #[instrument(skip_all)]
2979    pub async fn sync_with_callback<C>(
2980        &self,
2981        sync_settings: crate::config::SyncSettings,
2982        callback: impl Fn(SyncResponse) -> C,
2983    ) -> Result<(), Error>
2984    where
2985        C: Future<Output = LoopCtrl>,
2986    {
2987        self.sync_with_result_callback(sync_settings, |result| async {
2988            Ok(callback(result?).await)
2989        })
2990        .await
2991    }
2992
2993    /// Repeatedly call sync to synchronize the client state with the server.
2994    ///
2995    /// # Arguments
2996    ///
2997    /// * `sync_settings` - Settings for the sync call. *Note* that those
2998    ///   settings will be only used for the first sync call. See the argument
2999    ///   docs for [`Client::sync_once`] for more info.
3000    ///
3001    /// * `callback` - A callback that will be called every time after a
3002    ///   response has been received, failure or not. The callback returns a
3003    ///   `Result<LoopCtrl, Error>`, too. When returning
3004    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
3005    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
3006    ///   function returns `Ok(())`. In case the callback can't handle the
3007    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
3008    ///   which results in the sync ending and the `Err(Error)` being returned.
3009    ///
3010    /// # Return
3011    /// The sync runs until an error occurs that the callback can't handle or
3012    /// the callback indicates that the Loop should stop. If the callback
3013    /// asked for a regular stop, the result will be `Ok(())` otherwise the
3014    /// `Err(Error)` is returned.
3015    ///
3016    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
3017    /// this, and are handled first without sending the result to the
3018    /// callback. Only after they have exceeded is the `Result` handed to
3019    /// the callback.
3020    ///
3021    /// # Examples
3022    ///
3023    /// The following example demonstrates how to sync forever while sending all
3024    /// the interesting events through a mpsc channel to another thread e.g. a
3025    /// UI thread.
3026    ///
3027    /// ```no_run
3028    /// # use std::time::Duration;
3029    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
3030    /// # use url::Url;
3031    /// # async {
3032    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
3033    /// # let mut client = Client::new(homeserver).await.unwrap();
3034    /// #
3035    /// use tokio::sync::mpsc::channel;
3036    ///
3037    /// let (tx, rx) = channel(100);
3038    ///
3039    /// let sync_channel = &tx;
3040    /// let sync_settings = SyncSettings::new()
3041    ///     .timeout(Duration::from_secs(30));
3042    ///
3043    /// client
3044    ///     .sync_with_result_callback(sync_settings, |response| async move {
3045    ///         let channel = sync_channel;
3046    ///         let sync_response = response?;
3047    ///         for (room_id, room) in sync_response.rooms.joined {
3048    ///              for event in room.timeline.events {
3049    ///                  channel.send(event).await.unwrap();
3050    ///               }
3051    ///         }
3052    ///
3053    ///         Ok(LoopCtrl::Continue)
3054    ///     })
3055    ///     .await;
3056    /// };
3057    /// ```
3058    #[instrument(skip(self, callback))]
3059    pub async fn sync_with_result_callback<C>(
3060        &self,
3061        sync_settings: crate::config::SyncSettings,
3062        callback: impl Fn(Result<SyncResponse, Error>) -> C,
3063    ) -> Result<(), Error>
3064    where
3065        C: Future<Output = Result<LoopCtrl, Error>>,
3066    {
3067        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
3068
3069        while let Some(result) = sync_stream.next().await {
3070            trace!("Running callback");
3071            if callback(result).await? == LoopCtrl::Break {
3072                trace!("Callback told us to stop");
3073                break;
3074            }
3075            trace!("Done running callback");
3076        }
3077
3078        Ok(())
3079    }
3080
3081    //// Repeatedly synchronize the client state with the server.
3082    ///
3083    /// This method will internally call [`Client::sync_once`] in a loop and is
3084    /// equivalent to the [`Client::sync`] method but the responses are provided
3085    /// as an async stream.
3086    ///
3087    /// # Arguments
3088    ///
3089    /// * `sync_settings` - Settings for the sync call. *Note* that those
3090    ///   settings will be only used for the first sync call. See the argument
3091    ///   docs for [`Client::sync_once`] for more info.
3092    ///
3093    /// # Examples
3094    ///
3095    /// ```no_run
3096    /// # use url::Url;
3097    /// # async {
3098    /// # let homeserver = Url::parse("http://localhost:8080")?;
3099    /// # let username = "";
3100    /// # let password = "";
3101    /// use futures_util::StreamExt;
3102    /// use matrix_sdk::{Client, config::SyncSettings};
3103    ///
3104    /// let client = Client::new(homeserver).await?;
3105    /// client.matrix_auth().login_username(&username, &password).send().await?;
3106    ///
3107    /// let mut sync_stream =
3108    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
3109    ///
3110    /// while let Some(Ok(response)) = sync_stream.next().await {
3111    ///     for room in response.rooms.joined.values() {
3112    ///         for e in &room.timeline.events {
3113    ///             if let Ok(event) = e.raw().deserialize() {
3114    ///                 println!("Received event {:?}", event);
3115    ///             }
3116    ///         }
3117    ///     }
3118    /// }
3119    ///
3120    /// # anyhow::Ok(()) };
3121    /// ```
3122    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3123    #[instrument(skip(self))]
3124    pub async fn sync_stream(
3125        &self,
3126        mut sync_settings: crate::config::SyncSettings,
3127    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3128        let mut is_first_sync = true;
3129        let mut timeout = None;
3130        let mut last_sync_time: Option<Instant> = None;
3131
3132        let parent_span = Span::current();
3133
3134        async_stream::stream!({
3135            loop {
3136                trace!("Syncing");
3137
3138                if sync_settings.ignore_timeout_on_first_sync {
3139                    if is_first_sync {
3140                        timeout = sync_settings.timeout.take();
3141                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
3142                        sync_settings.timeout = timeout.take();
3143                    }
3144
3145                    is_first_sync = false;
3146                }
3147
3148                yield self
3149                    .sync_loop_helper(&mut sync_settings)
3150                    .instrument(parent_span.clone())
3151                    .await;
3152
3153                Client::delay_sync(&mut last_sync_time).await
3154            }
3155        })
3156    }
3157
3158    /// Get the current, if any, sync token of the client.
3159    /// This will be None if the client didn't sync at least once.
3160    pub(crate) async fn sync_token(&self) -> Option<String> {
3161        self.inner.base_client.sync_token().await
3162    }
3163
3164    /// Gets information about the owner of a given access token.
3165    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3166        let request = whoami::v3::Request::new();
3167        self.send(request).await
3168    }
3169
3170    /// Subscribes a new receiver to client SessionChange broadcasts.
3171    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3172        let broadcast = &self.auth_ctx().session_change_sender;
3173        broadcast.subscribe()
3174    }
3175
3176    /// Sets the save/restore session callbacks.
3177    ///
3178    /// This is another mechanism to get synchronous updates to session tokens,
3179    /// while [`Self::subscribe_to_session_changes`] provides an async update.
3180    pub fn set_session_callbacks(
3181        &self,
3182        reload_session_callback: Box<ReloadSessionCallback>,
3183        save_session_callback: Box<SaveSessionCallback>,
3184    ) -> Result<()> {
3185        self.inner
3186            .auth_ctx
3187            .reload_session_callback
3188            .set(reload_session_callback)
3189            .map_err(|_| Error::MultipleSessionCallbacks)?;
3190
3191        self.inner
3192            .auth_ctx
3193            .save_session_callback
3194            .set(save_session_callback)
3195            .map_err(|_| Error::MultipleSessionCallbacks)?;
3196
3197        Ok(())
3198    }
3199
3200    /// Get the notification settings of the current owner of the client.
3201    pub async fn notification_settings(&self) -> NotificationSettings {
3202        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3203        NotificationSettings::new(self.clone(), ruleset)
3204    }
3205
3206    /// Create a new specialized `Client` that can process notifications.
3207    ///
3208    /// See [`CrossProcessLock::new`] to learn more about
3209    /// `cross_process_lock_config`.
3210    ///
3211    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3212    pub async fn notification_client(
3213        &self,
3214        cross_process_lock_config: CrossProcessLockConfig,
3215    ) -> Result<Client> {
3216        let client = Client {
3217            inner: ClientInner::new(
3218                self.inner.auth_ctx.clone(),
3219                self.server().cloned(),
3220                self.homeserver(),
3221                self.sliding_sync_version(),
3222                self.inner.http_client.clone(),
3223                self.inner
3224                    .base_client
3225                    .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3226                    .await?,
3227                self.inner.caches.supported_versions.value(),
3228                self.inner.caches.well_known.value(),
3229                self.inner.respect_login_well_known,
3230                self.inner.event_cache.clone(),
3231                self.inner.send_queue_data.clone(),
3232                self.inner.latest_events.clone(),
3233                #[cfg(feature = "e2e-encryption")]
3234                self.inner.e2ee.encryption_settings,
3235                #[cfg(feature = "e2e-encryption")]
3236                self.inner.enable_share_history_on_invite,
3237                cross_process_lock_config,
3238                #[cfg(feature = "experimental-search")]
3239                self.inner.search_index.clone(),
3240                self.inner.thread_subscription_catchup.clone(),
3241            )
3242            .await,
3243        };
3244
3245        Ok(client)
3246    }
3247
3248    /// The [`EventCache`] instance for this [`Client`].
3249    pub fn event_cache(&self) -> &EventCache {
3250        // SAFETY: always initialized in the `Client` ctor.
3251        self.inner.event_cache.get().unwrap()
3252    }
3253
3254    /// The [`LatestEvents`] instance for this [`Client`].
3255    pub async fn latest_events(&self) -> &LatestEvents {
3256        self.inner
3257            .latest_events
3258            .get_or_init(|| async {
3259                LatestEvents::new(
3260                    WeakClient::from_client(self),
3261                    self.event_cache().clone(),
3262                    SendQueue::new(self.clone()),
3263                    self.room_info_notable_update_receiver(),
3264                )
3265            })
3266            .await
3267    }
3268
3269    /// Waits until an at least partially synced room is received, and returns
3270    /// it.
3271    ///
3272    /// **Note: this function will loop endlessly until either it finds the room
3273    /// or an externally set timeout happens.**
3274    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3275        loop {
3276            if let Some(room) = self.get_room(room_id) {
3277                if room.is_state_partially_or_fully_synced() {
3278                    debug!("Found just created room!");
3279                    return room;
3280                }
3281                debug!("Room wasn't partially synced, waiting for sync beat to try again");
3282            } else {
3283                debug!("Room wasn't found, waiting for sync beat to try again");
3284            }
3285            self.inner.sync_beat.listen().await;
3286        }
3287    }
3288
3289    /// Knock on a room given its `room_id_or_alias` to ask for permission to
3290    /// join it.
3291    pub async fn knock(
3292        &self,
3293        room_id_or_alias: OwnedRoomOrAliasId,
3294        reason: Option<String>,
3295        server_names: Vec<OwnedServerName>,
3296    ) -> Result<Room> {
3297        let request =
3298            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3299        let response = self.send(request).await?;
3300        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3301        Ok(Room::new(self.clone(), base_room))
3302    }
3303
3304    /// Checks whether the provided `user_id` belongs to an ignored user.
3305    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3306        self.base_client().is_user_ignored(user_id).await
3307    }
3308
3309    /// Gets the `max_upload_size` value from the homeserver, getting either a
3310    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3311    /// missing.
3312    ///
3313    /// Check the spec for more info:
3314    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3315    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3316        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3317        if let Some(data) = max_upload_size_lock.get() {
3318            return Ok(data.to_owned());
3319        }
3320
3321        // Use the authenticated endpoint when the server supports it.
3322        let supported_versions = self.supported_versions().await?;
3323        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3324            .is_supported(&supported_versions);
3325
3326        let upload_size = if use_auth {
3327            self.send(authenticated_media::get_media_config::v1::Request::default())
3328                .await?
3329                .upload_size
3330        } else {
3331            #[allow(deprecated)]
3332            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3333        };
3334
3335        match max_upload_size_lock.set(upload_size) {
3336            Ok(_) => Ok(upload_size),
3337            Err(error) => {
3338                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3339            }
3340        }
3341    }
3342
3343    /// The settings to use for decrypting events.
3344    #[cfg(feature = "e2e-encryption")]
3345    pub fn decryption_settings(&self) -> &DecryptionSettings {
3346        &self.base_client().decryption_settings
3347    }
3348
3349    /// Returns the [`SearchIndex`] for this [`Client`].
3350    #[cfg(feature = "experimental-search")]
3351    pub fn search_index(&self) -> &SearchIndex {
3352        &self.inner.search_index
3353    }
3354
3355    /// Whether the client is configured to take thread subscriptions (MSC4306
3356    /// and MSC4308) into account, and the server enabled the experimental
3357    /// feature flag for it.
3358    ///
3359    /// This may cause filtering out of thread subscriptions, and loading the
3360    /// thread subscriptions via the sliding sync extension, when the room
3361    /// list service is being used.
3362    ///
3363    /// This is async and fallible as it may use the network to retrieve the
3364    /// server supported features, if they aren't cached already.
3365    pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3366        // Check if the client is configured to support thread subscriptions first.
3367        match self.base_client().threading_support {
3368            ThreadingSupport::Enabled { with_subscriptions: false }
3369            | ThreadingSupport::Disabled => return Ok(false),
3370            ThreadingSupport::Enabled { with_subscriptions: true } => {}
3371        }
3372
3373        // Now, let's check that the server supports it.
3374        let server_enabled = self
3375            .supported_versions()
3376            .await?
3377            .features
3378            .contains(&FeatureFlag::from("org.matrix.msc4306"));
3379
3380        Ok(server_enabled)
3381    }
3382
3383    /// Fetch thread subscriptions changes between `from` and up to `to`.
3384    ///
3385    /// The `limit` optional parameter can be used to limit the number of
3386    /// entries in a response. It can also be overridden by the server, if
3387    /// it's deemed too large.
3388    pub async fn fetch_thread_subscriptions(
3389        &self,
3390        from: Option<String>,
3391        to: Option<String>,
3392        limit: Option<UInt>,
3393    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3394        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3395            from,
3396            to,
3397            limit,
3398        });
3399        Ok(self.send(request).await?)
3400    }
3401
3402    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3403        self.inner.thread_subscription_catchup.get().unwrap()
3404    }
3405
3406    /// Pause the client for background suspension.
3407    ///
3408    /// This method:
3409    /// 1. Disables all send queues (prevents new message sends).
3410    /// 2. Pauses all database stores, waiting for in-flight operations and
3411    ///    releasing all connections and file locks.
3412    ///
3413    /// Call [`Client::resume()`] when the app returns to the foreground.
3414    ///
3415    /// # iOS
3416    ///
3417    /// Call this before the app is suspended to avoid `0xdead10cc` kills.
3418    /// Typically called from
3419    /// [`applicationDidEnterBackground`](https://developer.apple.com/documentation/uikit/uiapplicationdelegate/applicationdidenterbackground(_:))
3420    /// or an equivalent SwiftUI lifecycle event, *after* stopping the
3421    /// `matrix_sdk_ui::sync_service::SyncService`.
3422    pub async fn pause(&self) -> Result<()> {
3423        info!("Client::pause — releasing database resources");
3424
3425        // Disable send queues so no new sends hit the stores.
3426        self.send_queue().set_enabled(false).await;
3427
3428        // Close all stores (waits for in-flight ops, closes connections).
3429        self.base_client().close_stores().await?;
3430
3431        info!("Client::pause — complete, all database connections released");
3432        Ok(())
3433    }
3434
3435    /// Resume the client after a [`Client::pause()`].
3436    ///
3437    /// Re-acquires store resources and re-enables send queues.
3438    ///
3439    /// If your app stopped the `matrix_sdk_ui::sync_service::SyncService`
3440    /// before pausing, restart it separately as appropriate for your app
3441    /// lifecycle.
3442    pub async fn resume(&self) -> Result<()> {
3443        info!("Client::resume — re-acquiring database resources");
3444
3445        // Reopen stores (creates new connection pools).
3446        self.base_client().reopen_stores().await?;
3447
3448        // Re-enable send queues.
3449        self.send_queue().set_enabled(true).await;
3450
3451        info!("Client::resume — complete");
3452        Ok(())
3453    }
3454
3455    /// Perform database optimizations if any are available, i.e. vacuuming in
3456    /// SQLite.
3457    ///
3458    /// **Warning:** this was added to check if SQLite fragmentation was the
3459    /// source of performance issues, **DO NOT use in production**.
3460    #[doc(hidden)]
3461    pub async fn optimize_stores(&self) -> Result<()> {
3462        trace!("Optimizing state store...");
3463        self.state_store().optimize().await?;
3464
3465        trace!("Optimizing event cache store...");
3466        if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3467            clean_lock.optimize().await?;
3468        }
3469
3470        trace!("Optimizing media store...");
3471        self.media_store().lock().await?.optimize().await?;
3472
3473        Ok(())
3474    }
3475
3476    /// Returns the sizes of the existing stores, if known.
3477    pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3478        #[cfg(feature = "e2e-encryption")]
3479        let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3480            && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3481        {
3482            Some(store_size)
3483        } else {
3484            None
3485        };
3486        #[cfg(not(feature = "e2e-encryption"))]
3487        let crypto_store_size = None;
3488
3489        let state_store_size = self.state_store().get_size().await.ok().flatten();
3490
3491        let event_cache_store_size = if let Some(clean_lock) =
3492            self.event_cache_store().lock().await?.as_clean()
3493            && let Ok(Some(store_size)) = clean_lock.get_size().await
3494        {
3495            Some(store_size)
3496        } else {
3497            None
3498        };
3499
3500        let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3501
3502        Ok(StoreSizes {
3503            crypto_store: crypto_store_size,
3504            state_store: state_store_size,
3505            event_cache_store: event_cache_store_size,
3506            media_store: media_store_size,
3507        })
3508    }
3509
3510    /// Get a reference to the client's task monitor, for spawning background
3511    /// tasks.
3512    pub fn task_monitor(&self) -> &TaskMonitor {
3513        &self.inner.task_monitor
3514    }
3515
3516    /// Add a subscriber for duplicate key upload error notifications triggered
3517    /// by requests to /keys/upload.
3518    #[cfg(feature = "e2e-encryption")]
3519    pub fn subscribe_to_duplicate_key_upload_errors(
3520        &self,
3521    ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3522        self.inner.duplicate_key_upload_error_sender.subscribe()
3523    }
3524
3525    /// Check the record of whether we are waiting for an [MSC4268] key bundle
3526    /// for the given room.
3527    ///
3528    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3529    #[cfg(feature = "e2e-encryption")]
3530    pub async fn get_pending_key_bundle_details_for_room(
3531        &self,
3532        room_id: &RoomId,
3533    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3534        Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3535    }
3536
3537    /// Returns the [`DmRoomDefinition`] this client uses to check if a room is
3538    /// a DM.
3539    pub fn dm_room_definition(&self) -> &DmRoomDefinition {
3540        &self.inner.base_client.dm_room_definition
3541    }
3542}
3543
3544/// Contains the disk size of the different stores, if known. It won't be
3545/// available for in-memory stores.
3546#[derive(Debug, Clone)]
3547pub struct StoreSizes {
3548    /// The size of the CryptoStore.
3549    pub crypto_store: Option<usize>,
3550    /// The size of the StateStore.
3551    pub state_store: Option<usize>,
3552    /// The size of the EventCacheStore.
3553    pub event_cache_store: Option<usize>,
3554    /// The size of the MediaStore.
3555    pub media_store: Option<usize>,
3556}
3557
3558#[cfg(any(feature = "testing", test))]
3559impl Client {
3560    /// Test helper to mark users as tracked by the crypto layer.
3561    #[cfg(feature = "e2e-encryption")]
3562    pub async fn update_tracked_users_for_testing(
3563        &self,
3564        user_ids: impl IntoIterator<Item = &UserId>,
3565    ) {
3566        let olm = self.olm_machine().await;
3567        let olm = olm.as_ref().unwrap();
3568        olm.update_tracked_users(user_ids).await.unwrap();
3569    }
3570}
3571
3572/// A weak reference to the inner client, useful when trying to get a handle
3573/// on the owning client.
3574#[derive(Clone, Debug)]
3575pub(crate) struct WeakClient {
3576    client: Weak<ClientInner>,
3577}
3578
3579impl WeakClient {
3580    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3581    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3582        Self { client: Arc::downgrade(client) }
3583    }
3584
3585    /// Construct a [`WeakClient`] from a [`Client`].
3586    pub fn from_client(client: &Client) -> Self {
3587        Self::from_inner(&client.inner)
3588    }
3589
3590    /// Attempts to get a [`Client`] from this [`WeakClient`].
3591    pub fn get(&self) -> Option<Client> {
3592        self.client.upgrade().map(|inner| Client { inner })
3593    }
3594
3595    /// Gets the number of strong (`Arc`) pointers still pointing to this
3596    /// client.
3597    #[allow(dead_code)]
3598    pub fn strong_count(&self) -> usize {
3599        self.client.strong_count()
3600    }
3601}
3602
3603/// Information about the state of a room before we joined it.
3604#[derive(Debug, Clone, Default)]
3605struct PreJoinRoomInfo {
3606    /// The user who invited us to the room, if any.
3607    pub inviter: Option<RoomMember>,
3608}
3609
3610// The http mocking library is not supported for wasm32
3611#[cfg(all(test, not(target_family = "wasm")))]
3612pub(crate) mod tests {
3613    use std::{sync::Arc, time::Duration};
3614
3615    use assert_matches::assert_matches;
3616    use assert_matches2::assert_let;
3617    use eyeball::SharedObservable;
3618    use futures_util::{FutureExt, StreamExt, pin_mut};
3619    use js_int::{UInt, uint};
3620    use matrix_sdk_base::{
3621        RoomState,
3622        store::{MemoryStore, StoreConfig},
3623        ttl::TtlValue,
3624    };
3625    use matrix_sdk_test::{
3626        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3627        event_factory::EventFactory,
3628    };
3629    #[cfg(target_family = "wasm")]
3630    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3631
3632    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3633    use ruma::{
3634        RoomId, ServerName, UserId,
3635        api::{
3636            FeatureFlag, MatrixVersion,
3637            client::{
3638                discovery::discover_homeserver::RtcFocusInfo,
3639                room::create_room::v3::Request as CreateRoomRequest,
3640            },
3641        },
3642        assign,
3643        events::{
3644            ignored_user_list::IgnoredUserListEventContent,
3645            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3646        },
3647        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3648    };
3649    use serde_json::json;
3650    use stream_assert::{assert_next_matches, assert_pending};
3651    use tokio::{
3652        spawn,
3653        time::{sleep, timeout},
3654    };
3655    use url::Url;
3656
3657    use super::Client;
3658    use crate::{
3659        Error, TransmissionProgress,
3660        client::{WeakClient, caches::CachedValue, futures::SendMediaUploadRequest},
3661        config::{RequestConfig, SyncSettings},
3662        futures::SendRequest,
3663        media::MediaError,
3664        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3665    };
3666
3667    #[async_test]
3668    async fn test_account_data() {
3669        let server = MatrixMockServer::new().await;
3670        let client = server.client_builder().build().await;
3671
3672        let f = EventFactory::new();
3673        server
3674            .mock_sync()
3675            .ok_and_run(&client, |builder| {
3676                builder.add_global_account_data(
3677                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3678                );
3679            })
3680            .await;
3681
3682        let content = client
3683            .account()
3684            .account_data::<IgnoredUserListEventContent>()
3685            .await
3686            .unwrap()
3687            .unwrap()
3688            .deserialize()
3689            .unwrap();
3690
3691        assert_eq!(content.ignored_users.len(), 1);
3692    }
3693
3694    #[async_test]
3695    async fn test_successful_discovery() {
3696        // Imagine this is `matrix.org`.
3697        let server = MatrixMockServer::new().await;
3698        let server_url = server.uri();
3699
3700        // Imagine this is `matrix-client.matrix.org`.
3701        let homeserver = MatrixMockServer::new().await;
3702        let homeserver_url = homeserver.uri();
3703
3704        // Imagine Alice has the user ID `@alice:matrix.org`.
3705        let domain = server_url.strip_prefix("http://").unwrap();
3706        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3707
3708        // The `.well-known` is on the server (e.g. `matrix.org`).
3709        server
3710            .mock_well_known()
3711            .ok_with_homeserver_url(&homeserver_url)
3712            .mock_once()
3713            .named("well-known")
3714            .mount()
3715            .await;
3716
3717        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3718        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3719
3720        let client = Client::builder()
3721            .insecure_server_name_no_tls(alice.server_name())
3722            .build()
3723            .await
3724            .unwrap();
3725
3726        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3727        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3728        client.server_versions().await.unwrap();
3729    }
3730
3731    #[async_test]
3732    async fn test_discovery_broken_server() {
3733        let server = MatrixMockServer::new().await;
3734        let server_url = server.uri();
3735        let domain = server_url.strip_prefix("http://").unwrap();
3736        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3737
3738        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3739
3740        assert!(
3741            Client::builder()
3742                .insecure_server_name_no_tls(alice.server_name())
3743                .build()
3744                .await
3745                .is_err(),
3746            "Creating a client from a user ID should fail when the .well-known request fails."
3747        );
3748    }
3749
3750    #[async_test]
3751    async fn test_room_creation() {
3752        let server = MatrixMockServer::new().await;
3753        let client = server.client_builder().build().await;
3754
3755        let f = EventFactory::new().sender(user_id!("@example:localhost"));
3756        server
3757            .mock_sync()
3758            .ok_and_run(&client, |builder| {
3759                builder.add_joined_room(
3760                    JoinedRoomBuilder::default()
3761                        .add_state_event(
3762                            f.member(user_id!("@example:localhost")).display_name("example"),
3763                        )
3764                        .add_state_event(f.default_power_levels()),
3765                );
3766            })
3767            .await;
3768
3769        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3770        assert_eq!(room.state(), RoomState::Joined);
3771    }
3772
3773    #[async_test]
3774    async fn test_retry_limit_http_requests() {
3775        let server = MatrixMockServer::new().await;
3776        let client = server
3777            .client_builder()
3778            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3779            .build()
3780            .await;
3781
3782        assert!(client.request_config().retry_limit.unwrap() == 4);
3783
3784        server.mock_who_am_i().error500().expect(4).mount().await;
3785
3786        client.whoami().await.unwrap_err();
3787    }
3788
3789    #[async_test]
3790    async fn test_retry_timeout_http_requests() {
3791        // Keep this timeout small so that the test doesn't take long
3792        let retry_timeout = Duration::from_secs(5);
3793        let server = MatrixMockServer::new().await;
3794        let client = server
3795            .client_builder()
3796            .on_builder(|builder| {
3797                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3798            })
3799            .build()
3800            .await;
3801
3802        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3803
3804        server.mock_login().error500().expect(2..).mount().await;
3805
3806        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3807    }
3808
3809    #[async_test]
3810    async fn test_short_retry_initial_http_requests() {
3811        let server = MatrixMockServer::new().await;
3812        let client = server
3813            .client_builder()
3814            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3815            .build()
3816            .await;
3817
3818        server.mock_login().error500().expect(3..).mount().await;
3819
3820        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3821    }
3822
3823    #[async_test]
3824    async fn test_no_retry_http_requests() {
3825        let server = MatrixMockServer::new().await;
3826        let client = server.client_builder().build().await;
3827
3828        server.mock_devices().error500().mock_once().mount().await;
3829
3830        client.devices().await.unwrap_err();
3831    }
3832
3833    #[async_test]
3834    async fn test_set_homeserver() {
3835        let client = MockClientBuilder::new(None).build().await;
3836        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3837
3838        let homeserver = Url::parse("http://example.com/").unwrap();
3839        client.set_homeserver(homeserver.clone());
3840        assert_eq!(client.homeserver(), homeserver);
3841    }
3842
3843    #[async_test]
3844    async fn test_search_user_request() {
3845        let server = MatrixMockServer::new().await;
3846        let client = server.client_builder().build().await;
3847
3848        server.mock_user_directory().ok().mock_once().mount().await;
3849
3850        let response = client.search_users("test", 50).await.unwrap();
3851        assert_eq!(response.results.len(), 1);
3852        let result = &response.results[0];
3853        assert_eq!(result.user_id.to_string(), "@test:example.me");
3854        assert_eq!(result.display_name.clone().unwrap(), "Test");
3855        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3856        assert!(!response.limited);
3857    }
3858
3859    #[async_test]
3860    async fn test_request_unstable_features() {
3861        let server = MatrixMockServer::new().await;
3862        let client = server.client_builder().no_server_versions().build().await;
3863
3864        server
3865            .mock_versions()
3866            .with_feature("org.matrix.e2e_cross_signing", true)
3867            .ok()
3868            .mock_once()
3869            .mount()
3870            .await;
3871
3872        let unstable_features = client.unstable_features().await.unwrap();
3873        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3874        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3875    }
3876
3877    #[async_test]
3878    async fn test_can_homeserver_push_encrypted_event_to_device() {
3879        let server = MatrixMockServer::new().await;
3880        let client = server.client_builder().no_server_versions().build().await;
3881
3882        server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3883
3884        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3885        assert!(msc4028_enabled);
3886    }
3887
3888    #[async_test]
3889    async fn test_recently_visited_rooms() {
3890        // Tracking recently visited rooms requires authentication
3891        let client = MockClientBuilder::new(None).unlogged().build().await;
3892        assert_matches!(
3893            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3894            Err(Error::AuthenticationRequired)
3895        );
3896
3897        let client = MockClientBuilder::new(None).build().await;
3898        let account = client.account();
3899
3900        // We should start off with an empty list
3901        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3902
3903        // Tracking a valid room id should add it to the list
3904        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3905        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3906        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3907
3908        // And the existing list shouldn't be changed
3909        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3910        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3911
3912        // Tracking the same room again shouldn't change the list
3913        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3914        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3915        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3916
3917        // Tracking a second room should add it to the front of the list
3918        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3919        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3920        assert_eq!(
3921            account.get_recently_visited_rooms().await.unwrap(),
3922            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3923        );
3924
3925        // Tracking the first room yet again should move it to the front of the list
3926        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3927        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3928        assert_eq!(
3929            account.get_recently_visited_rooms().await.unwrap(),
3930            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3931        );
3932
3933        // Tracking should be capped at 20
3934        for n in 0..20 {
3935            account
3936                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3937                .await
3938                .unwrap();
3939        }
3940
3941        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3942
3943        // And the initial rooms should've been pushed out
3944        let rooms = account.get_recently_visited_rooms().await.unwrap();
3945        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3946        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3947
3948        // And the last tracked room should be the first
3949        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3950    }
3951
3952    #[async_test]
3953    async fn test_client_no_cycle_with_event_cache() {
3954        let client = MockClientBuilder::new(None).build().await;
3955
3956        // Wait for the init tasks to die.
3957        sleep(Duration::from_secs(1)).await;
3958
3959        let weak_client = WeakClient::from_client(&client);
3960        assert_eq!(weak_client.strong_count(), 1);
3961
3962        {
3963            let room_id = room_id!("!room:example.org");
3964
3965            // Have the client know the room.
3966            let response = SyncResponseBuilder::default()
3967                .add_joined_room(JoinedRoomBuilder::new(room_id))
3968                .build_sync_response();
3969            client.inner.base_client.receive_sync_response(response).await.unwrap();
3970
3971            client.event_cache().subscribe().unwrap();
3972
3973            let (_room_event_cache, _drop_handles) =
3974                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3975        }
3976
3977        drop(client);
3978
3979        // Give a bit of time for background tasks to die.
3980        sleep(Duration::from_secs(1)).await;
3981
3982        // The weak client must be the last reference to the client now.
3983        assert_eq!(weak_client.strong_count(), 0);
3984        let client = weak_client.get();
3985        assert!(
3986            client.is_none(),
3987            "too many strong references to the client: {}",
3988            Arc::strong_count(&client.unwrap().inner)
3989        );
3990    }
3991
3992    #[async_test]
3993    async fn test_supported_versions_caching() {
3994        let server = MatrixMockServer::new().await;
3995
3996        let versions_mock = server
3997            .mock_versions()
3998            .expect_default_access_token()
3999            .with_feature("org.matrix.e2e_cross_signing", true)
4000            .ok()
4001            .named("first versions mock")
4002            .expect(1)
4003            .mount_as_scoped()
4004            .await;
4005
4006        let memory_store = Arc::new(MemoryStore::new());
4007        let client = server
4008            .client_builder()
4009            .no_server_versions()
4010            .on_builder(|builder| {
4011                builder.store_config(
4012                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4013                        .state_store(memory_store.clone()),
4014                )
4015            })
4016            .build()
4017            .await;
4018
4019        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4020
4021        // The result was cached.
4022        assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
4023        // This subsequent call hits the in-memory cache.
4024        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4025
4026        drop(client);
4027
4028        let client = server
4029            .client_builder()
4030            .no_server_versions()
4031            .on_builder(|builder| {
4032                builder.store_config(
4033                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4034                        .state_store(memory_store.clone()),
4035                )
4036            })
4037            .build()
4038            .await;
4039
4040        // These calls to the new client hit the on-disk cache.
4041        assert!(
4042            client
4043                .unstable_features()
4044                .await
4045                .unwrap()
4046                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
4047        );
4048
4049        let supported = client.supported_versions().await.unwrap();
4050        assert!(supported.versions.contains(&MatrixVersion::V1_0));
4051        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4052
4053        // Then this call hits the in-memory cache.
4054        let supported = client.supported_versions().await.unwrap();
4055        assert!(supported.versions.contains(&MatrixVersion::V1_0));
4056        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4057
4058        drop(versions_mock);
4059
4060        // Now, reset the cache, and observe the endpoint being called again once.
4061        client.reset_supported_versions().await.unwrap();
4062
4063        server.mock_versions().ok().expect(2).named("second versions mock").mount().await;
4064
4065        // Hits network again.
4066        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4067        // Hits in-memory cache again.
4068        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4069        assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4070
4071        // Force an expiry of the data.
4072        let supported_versions = client.supported_versions_cached().await.unwrap().unwrap();
4073        let mut ttl_value = TtlValue::new(supported_versions);
4074        ttl_value.expire();
4075        client.inner.caches.supported_versions.set_value(ttl_value);
4076
4077        // Call the method to trigger a cache refresh background task.
4078        client.supported_versions_cached().await.unwrap().unwrap();
4079
4080        // We wait for the task to finish, the endpoint should have been called again.
4081        sleep(Duration::from_secs(1)).await;
4082        assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4083    }
4084
4085    #[async_test]
4086    async fn test_well_known_caching() {
4087        let server = MatrixMockServer::new().await;
4088        let server_url = server.uri();
4089        let domain = server_url.strip_prefix("http://").unwrap();
4090        let server_name = <&ServerName>::try_from(domain).unwrap();
4091        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
4092
4093        let well_known_mock = server
4094            .mock_well_known()
4095            .ok()
4096            .named("well known mock")
4097            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
4098            .mount_as_scoped()
4099            .await;
4100
4101        let memory_store = Arc::new(MemoryStore::new());
4102        let client = Client::builder()
4103            .insecure_server_name_no_tls(server_name)
4104            .store_config(
4105                StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4106                    .state_store(memory_store.clone()),
4107            )
4108            .build()
4109            .await
4110            .unwrap();
4111
4112        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4113
4114        // This subsequent call hits the in-memory cache.
4115        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4116
4117        drop(client);
4118
4119        let client = server
4120            .client_builder()
4121            .no_server_versions()
4122            .on_builder(|builder| {
4123                builder.store_config(
4124                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4125                        .state_store(memory_store.clone()),
4126                )
4127            })
4128            .build()
4129            .await;
4130
4131        // This call to the new client hits the on-disk cache.
4132        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4133
4134        // Then this call hits the in-memory cache.
4135        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4136
4137        drop(well_known_mock);
4138
4139        // Now, reset the cache, and observe the endpoints being called again once.
4140        client.reset_well_known().await.unwrap();
4141
4142        server.mock_well_known().ok().named("second well known mock").expect(2).mount().await;
4143
4144        // Hits network again.
4145        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4146        // Hits in-memory cache again.
4147        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4148
4149        // Force an expiry of the data.
4150        let well_known = client.well_known().await;
4151        let mut ttl_value = TtlValue::new(well_known);
4152        ttl_value.expire();
4153        client.inner.caches.well_known.set_value(ttl_value);
4154
4155        // Call the method again to trigger a cache refresh background task.
4156        client.well_known().await;
4157
4158        // We wait for the task to finish, the endpoint should have been called again.
4159        // We need to wait a bit because the first requests using the server name of the
4160        // user will fail, only the requests using the homeserver URL will succeed.
4161        sleep(Duration::from_secs(5)).await;
4162        assert_matches!(client.inner.caches.well_known.value(), CachedValue::Cached(value) if !value.has_expired());
4163    }
4164
4165    #[async_test]
4166    async fn test_missing_well_known_caching() {
4167        let server = MatrixMockServer::new().await;
4168        let rtc_foci: Vec<RtcFocusInfo> = vec![];
4169
4170        let well_known_mock = server
4171            .mock_well_known()
4172            .error_unrecognized()
4173            .named("first well-known mock")
4174            .expect(1)
4175            .mount_as_scoped()
4176            .await;
4177
4178        let memory_store = Arc::new(MemoryStore::new());
4179        let client = server
4180            .client_builder()
4181            .on_builder(|builder| {
4182                builder.store_config(
4183                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4184                        .state_store(memory_store.clone()),
4185                )
4186            })
4187            .build()
4188            .await;
4189
4190        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4191
4192        // This subsequent call hits the in-memory cache.
4193        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4194
4195        drop(client);
4196
4197        let client = server
4198            .client_builder()
4199            .on_builder(|builder| {
4200                builder.store_config(
4201                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4202                        .state_store(memory_store.clone()),
4203                )
4204            })
4205            .build()
4206            .await;
4207
4208        // This call to the new client hits the on-disk cache.
4209        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4210
4211        // Then this call hits the in-memory cache.
4212        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4213
4214        drop(well_known_mock);
4215
4216        // Now, reset the cache, and observe the endpoints being called again once.
4217        client.reset_well_known().await.unwrap();
4218
4219        server
4220            .mock_well_known()
4221            .error_unrecognized()
4222            .expect(1)
4223            .named("second well-known mock")
4224            .mount()
4225            .await;
4226
4227        // Hits network again.
4228        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4229        // Hits in-memory cache again.
4230        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4231    }
4232
4233    #[async_test]
4234    async fn test_no_network_doesnt_cause_infinite_retries() {
4235        // We want infinite retries for transient errors.
4236        let client = MockClientBuilder::new(None)
4237            .on_builder(|builder| builder.request_config(RequestConfig::new()))
4238            .build()
4239            .await;
4240
4241        // We don't define a mock server on purpose here, so that the error is really a
4242        // network error.
4243        client.whoami().await.unwrap_err();
4244    }
4245
4246    #[async_test]
4247    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4248        let server = MatrixMockServer::new().await;
4249        let client = server.client_builder().build().await;
4250
4251        let room_id = room_id!("!room:example.org");
4252
4253        server
4254            .mock_sync()
4255            .ok_and_run(&client, |builder| {
4256                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4257            })
4258            .await;
4259
4260        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4261        assert_eq!(room.room_id(), room_id);
4262    }
4263
4264    #[async_test]
4265    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4266        let server = MatrixMockServer::new().await;
4267        let client = server.client_builder().build().await;
4268
4269        let room_id = room_id!("!room:example.org");
4270
4271        let client = Arc::new(client);
4272
4273        // Perform the /sync request with a delay so it starts after the
4274        // `await_room_remote_echo` call has happened
4275        spawn({
4276            let client = client.clone();
4277            async move {
4278                sleep(Duration::from_millis(100)).await;
4279
4280                server
4281                    .mock_sync()
4282                    .ok_and_run(&client, |builder| {
4283                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4284                    })
4285                    .await;
4286            }
4287        });
4288
4289        let room =
4290            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4291        assert_eq!(room.room_id(), room_id);
4292    }
4293
4294    #[async_test]
4295    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4296        let client = MockClientBuilder::new(None).build().await;
4297
4298        let room_id = room_id!("!room:example.org");
4299        // Room is not present so the client won't be able to find it. The call will
4300        // timeout.
4301        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4302    }
4303
4304    #[async_test]
4305    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4306        let server = MatrixMockServer::new().await;
4307        let client = server.client_builder().build().await;
4308
4309        server.mock_create_room().ok().mount().await;
4310
4311        // Create a room in the internal store
4312        let room = client
4313            .create_room(assign!(CreateRoomRequest::new(), {
4314                invite: vec![],
4315                is_direct: false,
4316            }))
4317            .await
4318            .unwrap();
4319
4320        // Room is locally present, but not synced, the call will timeout
4321        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4322            .await
4323            .unwrap_err();
4324    }
4325
4326    #[async_test]
4327    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4328        let server = MatrixMockServer::new().await;
4329        let client = server.client_builder().build().await;
4330
4331        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4332
4333        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4334        assert_matches!(ret, Ok(true));
4335    }
4336
4337    #[async_test]
4338    async fn test_is_room_alias_available_if_alias_is_resolved() {
4339        let server = MatrixMockServer::new().await;
4340        let client = server.client_builder().build().await;
4341
4342        server
4343            .mock_room_directory_resolve_alias()
4344            .ok("!some_room_id:matrix.org", Vec::new())
4345            .expect(1)
4346            .mount()
4347            .await;
4348
4349        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4350        assert_matches!(ret, Ok(false));
4351    }
4352
4353    #[async_test]
4354    async fn test_is_room_alias_available_if_error_found() {
4355        let server = MatrixMockServer::new().await;
4356        let client = server.client_builder().build().await;
4357
4358        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4359
4360        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4361        assert_matches!(ret, Err(_));
4362    }
4363
4364    #[async_test]
4365    async fn test_create_room_alias() {
4366        let server = MatrixMockServer::new().await;
4367        let client = server.client_builder().build().await;
4368
4369        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4370
4371        let ret = client
4372            .create_room_alias(
4373                room_alias_id!("#some_alias:matrix.org"),
4374                room_id!("!some_room:matrix.org"),
4375            )
4376            .await;
4377        assert_matches!(ret, Ok(()));
4378    }
4379
4380    #[async_test]
4381    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4382        let server = MatrixMockServer::new().await;
4383        let client = server.client_builder().build().await;
4384
4385        let room_id = room_id!("!a-room:matrix.org");
4386
4387        // Make sure the summary endpoint is called once
4388        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4389
4390        // We create a locally cached invited room
4391        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4392
4393        // And we get a preview, the server endpoint was reached
4394        let preview = client
4395            .get_room_preview(room_id.into(), Vec::new())
4396            .await
4397            .expect("Room preview should be retrieved");
4398
4399        assert_eq!(invited_room.room_id(), preview.room_id);
4400    }
4401
4402    #[async_test]
4403    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4404        let server = MatrixMockServer::new().await;
4405        let client = server.client_builder().build().await;
4406
4407        let room_id = room_id!("!a-room:matrix.org");
4408
4409        // Make sure the summary endpoint is called once
4410        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4411
4412        // We create a locally cached left room
4413        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4414
4415        // And we get a preview, the server endpoint was reached
4416        let preview = client
4417            .get_room_preview(room_id.into(), Vec::new())
4418            .await
4419            .expect("Room preview should be retrieved");
4420
4421        assert_eq!(left_room.room_id(), preview.room_id);
4422    }
4423
4424    #[async_test]
4425    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4426        let server = MatrixMockServer::new().await;
4427        let client = server.client_builder().build().await;
4428
4429        let room_id = room_id!("!a-room:matrix.org");
4430
4431        // Make sure the summary endpoint is called once
4432        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4433
4434        // We create a locally cached knocked room
4435        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4436
4437        // And we get a preview, the server endpoint was reached
4438        let preview = client
4439            .get_room_preview(room_id.into(), Vec::new())
4440            .await
4441            .expect("Room preview should be retrieved");
4442
4443        assert_eq!(knocked_room.room_id(), preview.room_id);
4444    }
4445
4446    #[async_test]
4447    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4448        let server = MatrixMockServer::new().await;
4449        let client = server.client_builder().build().await;
4450
4451        let room_id = room_id!("!a-room:matrix.org");
4452
4453        // Make sure the summary endpoint is not called
4454        server.mock_room_summary().ok(room_id).never().mount().await;
4455
4456        // We create a locally cached joined room
4457        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4458
4459        // And we get a preview, no server endpoint was reached
4460        let preview = client
4461            .get_room_preview(room_id.into(), Vec::new())
4462            .await
4463            .expect("Room preview should be retrieved");
4464
4465        assert_eq!(joined_room.room_id(), preview.room_id);
4466    }
4467
4468    #[async_test]
4469    async fn test_media_preview_config() {
4470        let server = MatrixMockServer::new().await;
4471        let client = server.client_builder().build().await;
4472
4473        server
4474            .mock_sync()
4475            .ok_and_run(&client, |builder| {
4476                builder.add_custom_global_account_data(json!({
4477                    "content": {
4478                        "media_previews": "private",
4479                        "invite_avatars": "off"
4480                    },
4481                    "type": "m.media_preview_config"
4482                }));
4483            })
4484            .await;
4485
4486        let (initial_value, stream) =
4487            client.account().observe_media_preview_config().await.unwrap();
4488
4489        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4490        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4491        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4492        pin_mut!(stream);
4493        assert_pending!(stream);
4494
4495        server
4496            .mock_sync()
4497            .ok_and_run(&client, |builder| {
4498                builder.add_custom_global_account_data(json!({
4499                    "content": {
4500                        "media_previews": "off",
4501                        "invite_avatars": "on"
4502                    },
4503                    "type": "m.media_preview_config"
4504                }));
4505            })
4506            .await;
4507
4508        assert_next_matches!(
4509            stream,
4510            MediaPreviewConfigEventContent {
4511                media_previews: Some(MediaPreviews::Off),
4512                invite_avatars: Some(InviteAvatars::On),
4513                ..
4514            }
4515        );
4516        assert_pending!(stream);
4517    }
4518
4519    #[async_test]
4520    async fn test_unstable_media_preview_config() {
4521        let server = MatrixMockServer::new().await;
4522        let client = server.client_builder().build().await;
4523
4524        server
4525            .mock_sync()
4526            .ok_and_run(&client, |builder| {
4527                builder.add_custom_global_account_data(json!({
4528                    "content": {
4529                        "media_previews": "private",
4530                        "invite_avatars": "off"
4531                    },
4532                    "type": "io.element.msc4278.media_preview_config"
4533                }));
4534            })
4535            .await;
4536
4537        let (initial_value, stream) =
4538            client.account().observe_media_preview_config().await.unwrap();
4539
4540        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4541        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4542        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4543        pin_mut!(stream);
4544        assert_pending!(stream);
4545
4546        server
4547            .mock_sync()
4548            .ok_and_run(&client, |builder| {
4549                builder.add_custom_global_account_data(json!({
4550                    "content": {
4551                        "media_previews": "off",
4552                        "invite_avatars": "on"
4553                    },
4554                    "type": "io.element.msc4278.media_preview_config"
4555                }));
4556            })
4557            .await;
4558
4559        assert_next_matches!(
4560            stream,
4561            MediaPreviewConfigEventContent {
4562                media_previews: Some(MediaPreviews::Off),
4563                invite_avatars: Some(InviteAvatars::On),
4564                ..
4565            }
4566        );
4567        assert_pending!(stream);
4568    }
4569
4570    #[async_test]
4571    async fn test_media_preview_config_not_found() {
4572        let server = MatrixMockServer::new().await;
4573        let client = server.client_builder().build().await;
4574
4575        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4576
4577        assert!(initial_value.is_none());
4578    }
4579
4580    #[async_test]
4581    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4582        // The default Matrix version we use is 1.11 or higher, so authenticated media
4583        // is supported.
4584        let server = MatrixMockServer::new().await;
4585        let client = server.client_builder().build().await;
4586
4587        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4588
4589        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4590        client.load_or_fetch_max_upload_size().await.unwrap();
4591
4592        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4593    }
4594
4595    #[async_test]
4596    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4597        // The server must advertise support for the stable feature for authenticated
4598        // media support, so we mock the `GET /versions` response.
4599        let server = MatrixMockServer::new().await;
4600        let client = server.client_builder().no_server_versions().build().await;
4601
4602        server
4603            .mock_versions()
4604            .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4605            .with_feature("org.matrix.msc3916.stable", true)
4606            .ok()
4607            .named("versions")
4608            .expect(1)
4609            .mount()
4610            .await;
4611
4612        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4613
4614        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4615        client.load_or_fetch_max_upload_size().await.unwrap();
4616
4617        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4618    }
4619
4620    #[async_test]
4621    async fn test_load_or_fetch_max_upload_size_no_auth() {
4622        // The server must not support Matrix 1.11 or higher for unauthenticated
4623        // media requests, so we mock the `GET /versions` response.
4624        let server = MatrixMockServer::new().await;
4625        let client = server.client_builder().no_server_versions().build().await;
4626
4627        server
4628            .mock_versions()
4629            .with_versions(vec!["v1.1"])
4630            .ok()
4631            .named("versions")
4632            .expect(1)
4633            .mount()
4634            .await;
4635
4636        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4637
4638        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4639        client.load_or_fetch_max_upload_size().await.unwrap();
4640
4641        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4642    }
4643
4644    #[async_test]
4645    async fn test_uploading_a_too_large_media_file() {
4646        let server = MatrixMockServer::new().await;
4647        let client = server.client_builder().build().await;
4648
4649        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4650        client.load_or_fetch_max_upload_size().await.unwrap();
4651        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4652
4653        let data = vec![1, 2];
4654        let upload_request =
4655            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4656        let request = SendRequest {
4657            client: client.clone(),
4658            request: upload_request,
4659            config: None,
4660            send_progress: SharedObservable::new(TransmissionProgress::default()),
4661        };
4662        let media_request = SendMediaUploadRequest::new(request);
4663
4664        let error = media_request.await.err();
4665        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4666        assert_eq!(max, uint!(1));
4667        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4668    }
4669
4670    #[async_test]
4671    async fn test_dont_ignore_timeout_on_first_sync() {
4672        let server = MatrixMockServer::new().await;
4673        let client = server.client_builder().build().await;
4674
4675        server
4676            .mock_sync()
4677            .timeout(Some(Duration::from_secs(30)))
4678            .ok(|_| {})
4679            .mock_once()
4680            .named("sync_with_timeout")
4681            .mount()
4682            .await;
4683
4684        // Call the endpoint once to check the timeout.
4685        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4686
4687        timeout(Duration::from_secs(1), async {
4688            stream.next().await.unwrap().unwrap();
4689        })
4690        .await
4691        .unwrap();
4692    }
4693
4694    #[async_test]
4695    async fn test_ignore_timeout_on_first_sync() {
4696        let server = MatrixMockServer::new().await;
4697        let client = server.client_builder().build().await;
4698
4699        server
4700            .mock_sync()
4701            .timeout(None)
4702            .ok(|_| {})
4703            .mock_once()
4704            .named("sync_no_timeout")
4705            .mount()
4706            .await;
4707        server
4708            .mock_sync()
4709            .timeout(Some(Duration::from_secs(30)))
4710            .ok(|_| {})
4711            .mock_once()
4712            .named("sync_with_timeout")
4713            .mount()
4714            .await;
4715
4716        // Call each version of the endpoint once to check the timeouts.
4717        let mut stream = Box::pin(
4718            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4719        );
4720
4721        timeout(Duration::from_secs(1), async {
4722            stream.next().await.unwrap().unwrap();
4723            stream.next().await.unwrap().unwrap();
4724        })
4725        .await
4726        .unwrap();
4727    }
4728
4729    #[async_test]
4730    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4731        let server = MatrixMockServer::new().await;
4732        let client = server.client_builder().build().await;
4733        // This is the user ID that is inside MemberAdditional.
4734        // Note the confusing username, so we can share
4735        // GlobalAccountDataTestEvent::Direct with the invited test.
4736        let user_id = user_id!("@invited:localhost");
4737
4738        // When we receive a sync response saying "invited" is invited to a DM
4739        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4740        let response = SyncResponseBuilder::default()
4741            .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4742            .add_global_account_data(
4743                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4744            )
4745            .build_sync_response();
4746        client.base_client().receive_sync_response(response).await.unwrap();
4747
4748        // Then get_dm_room finds this room
4749        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4750        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4751    }
4752
4753    #[async_test]
4754    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4755        let server = MatrixMockServer::new().await;
4756        let client = server.client_builder().build().await;
4757        // This is the user ID that is inside MemberInvite
4758        let user_id = user_id!("@invited:localhost");
4759
4760        // When we receive a sync response saying "invited" is invited to a DM
4761        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4762        let response = SyncResponseBuilder::default()
4763            .add_joined_room(
4764                JoinedRoomBuilder::default()
4765                    .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4766            )
4767            .add_global_account_data(
4768                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4769            )
4770            .build_sync_response();
4771        client.base_client().receive_sync_response(response).await.unwrap();
4772
4773        // Then get_dm_room finds this room
4774        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4775        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4776    }
4777
4778    #[async_test]
4779    async fn test_get_dm_room_still_finds_left_room() {
4780        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4781        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4782
4783        let server = MatrixMockServer::new().await;
4784        let client = server.client_builder().build().await;
4785        // This is the user ID that is inside MemberAdditional.
4786        // Note the confusing username, so we can share
4787        // GlobalAccountDataTestEvent::Direct with the invited test.
4788        let user_id = user_id!("@invited:localhost");
4789
4790        // When we receive a sync response saying "invited" has left a DM
4791        let f = EventFactory::new().sender(user_id);
4792        let response = SyncResponseBuilder::default()
4793            .add_joined_room(
4794                JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4795            )
4796            .add_global_account_data(
4797                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4798            )
4799            .build_sync_response();
4800        client.base_client().receive_sync_response(response).await.unwrap();
4801
4802        // Then get_dm_room finds this room
4803        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4804        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4805    }
4806
4807    #[async_test]
4808    async fn test_device_exists() {
4809        let server = MatrixMockServer::new().await;
4810        let client = server.client_builder().build().await;
4811
4812        server.mock_get_device().ok().expect(1).mount().await;
4813
4814        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4815    }
4816
4817    #[async_test]
4818    async fn test_device_exists_404() {
4819        let server = MatrixMockServer::new().await;
4820        let client = server.client_builder().build().await;
4821
4822        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4823    }
4824
4825    #[async_test]
4826    async fn test_device_exists_500() {
4827        let server = MatrixMockServer::new().await;
4828        let client = server.client_builder().build().await;
4829
4830        server.mock_get_device().error500().expect(1).mount().await;
4831
4832        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4833    }
4834
4835    #[async_test]
4836    async fn test_fetching_well_known_with_homeserver_url() {
4837        let server = MatrixMockServer::new().await;
4838        let client = server.client_builder().build().await;
4839        server.mock_well_known().ok().mount().await;
4840
4841        assert_matches!(client.fetch_client_well_known().await, Some(_));
4842    }
4843
4844    #[async_test]
4845    async fn test_fetching_well_known_with_server_name() {
4846        let server = MatrixMockServer::new().await;
4847        let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4848
4849        server.mock_well_known().ok().mount().await;
4850
4851        let client = MockClientBuilder::new(None)
4852            .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4853            .build()
4854            .await;
4855
4856        assert_matches!(client.fetch_client_well_known().await, Some(_));
4857    }
4858
4859    #[async_test]
4860    async fn test_fetching_well_known_with_domain_part_of_user_id() {
4861        let server = MatrixMockServer::new().await;
4862        server.mock_well_known().ok().mount().await;
4863
4864        let user_id =
4865            UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4866        let client = MockClientBuilder::new(None)
4867            .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4868            .build()
4869            .await;
4870
4871        assert_matches!(client.fetch_client_well_known().await, Some(_));
4872    }
4873}