Skip to main content

matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32    DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35    BaseClient, DmRoomDefinition, RoomInfoNotableUpdate, RoomState, RoomStateFilter,
36    SendOutsideWasm, SessionMeta, StateStoreDataKey, StateStoreDataValue, StoreError,
37    SyncOutsideWasm, ThreadingSupport,
38    event_cache::store::EventCacheStoreLock,
39    media::store::MediaStoreLock,
40    store::{DynStateStore, RoomLoadSettings, SupportedVersionsResponse, WellKnownResponse},
41    sync::{Notification, RoomUpdates},
42    task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl::TtlValue};
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50    api::{
51        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52        client::{
53            account::whoami,
54            alias::{create_alias, delete_alias, get_alias},
55            authenticated_media,
56            device::{self, delete_devices, get_devices, update_device},
57            directory::{get_public_rooms, get_public_rooms_filtered},
58            discovery::{
59                discover_homeserver::{self, RtcFocusInfo},
60                get_supported_versions,
61            },
62            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
63            knock::knock_room,
64            media,
65            membership::{join_room_by_id, join_room_by_id_or_alias},
66            room::create_room,
67            session::login::v3::DiscoveryInfo,
68            sync::sync_events,
69            threads::get_thread_subscriptions_changes,
70            uiaa,
71            user_directory::search_users,
72        },
73        error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
74        path_builder::PathBuilder,
75    },
76    assign,
77    events::{beacon_info::OriginalSyncBeaconInfoEvent, direct::DirectUserIdentifier},
78    push::Ruleset,
79    time::Instant,
80};
81use serde::de::DeserializeOwned;
82use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
83use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
84use url::Url;
85
86use self::{
87    caches::{Cache, CachedValue, ClientCaches},
88    futures::SendRequest,
89};
90use crate::{
91    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
92    Room, SessionTokens, TransmissionProgress,
93    authentication::{
94        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
95        oauth::OAuth,
96    },
97    client::{
98        homeserver_capabilities::HomeserverCapabilities,
99        thread_subscriptions::ThreadSubscriptionCatchup,
100    },
101    config::{RequestConfig, SyncToken},
102    deduplicating_handler::DeduplicatingHandler,
103    error::HttpResult,
104    event_cache::EventCache,
105    event_handler::{
106        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
107        EventHandlerStore, ObservableEventHandler, SyncEvent,
108    },
109    http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
110    latest_events::LatestEvents,
111    live_locations_observer::BeaconInfoUpdate,
112    media::MediaError,
113    notification_settings::NotificationSettings,
114    room::RoomMember,
115    room_preview::RoomPreview,
116    send_queue::{SendQueue, SendQueueData},
117    sliding_sync::Version as SlidingSyncVersion,
118    sync::{RoomUpdate, SyncResponse},
119};
120#[cfg(feature = "e2e-encryption")]
121use crate::{
122    cross_process_lock::CrossProcessLock,
123    encryption::{
124        DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
125        VerificationState,
126    },
127};
128
129mod builder;
130pub(crate) mod caches;
131pub(crate) mod futures;
132pub(crate) mod homeserver_capabilities;
133pub(crate) mod thread_subscriptions;
134
135pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
136#[cfg(feature = "experimental-search")]
137use crate::search_index::SearchIndex;
138
139#[cfg(not(target_family = "wasm"))]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
141#[cfg(target_family = "wasm")]
142type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
143
144#[cfg(not(target_family = "wasm"))]
145type NotificationHandlerFn =
146    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
147#[cfg(target_family = "wasm")]
148type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
149
150/// Enum controlling if a loop running callbacks should continue or abort.
151///
152/// This is mainly used in the [`sync_with_callback`] method, the return value
153/// of the provided callback controls if the sync loop should be exited.
154///
155/// [`sync_with_callback`]: #method.sync_with_callback
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum LoopCtrl {
158    /// Continue running the loop.
159    Continue,
160    /// Break out of the loop.
161    Break,
162}
163
164/// Represents changes that can occur to a `Client`s `Session`.
165#[derive(Debug, Clone, PartialEq)]
166pub enum SessionChange {
167    /// The session's token is no longer valid.
168    UnknownToken(UnknownTokenErrorData),
169    /// The session's tokens have been refreshed.
170    TokensRefreshed,
171}
172
173/// Information about the server vendor obtained from the federation API.
174#[derive(Debug, Clone, PartialEq, Eq)]
175#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
176pub struct ServerVendorInfo {
177    /// The server name.
178    pub server_name: String,
179    /// The server version.
180    pub version: String,
181}
182
183/// An async/await enabled Matrix client.
184///
185/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
186#[derive(Clone)]
187pub struct Client {
188    pub(crate) inner: Arc<ClientInner>,
189}
190
191#[derive(Default)]
192pub(crate) struct ClientLocks {
193    /// Lock ensuring that only a single room may be marked as a DM at once.
194    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
195    /// explanation.
196    pub(crate) mark_as_dm_lock: Mutex<()>,
197
198    /// Lock ensuring that only a single secret store is getting opened at the
199    /// same time.
200    ///
201    /// This is important so we don't accidentally create multiple different new
202    /// default secret storage keys.
203    #[cfg(feature = "e2e-encryption")]
204    pub(crate) open_secret_store_lock: Mutex<()>,
205
206    /// Lock ensuring that we're only storing a single secret at a time.
207    ///
208    /// Take a look at the [`SecretStore::put_secret`] method for a more
209    /// detailed explanation.
210    ///
211    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
212    #[cfg(feature = "e2e-encryption")]
213    pub(crate) store_secret_lock: Mutex<()>,
214
215    /// Lock ensuring that only one method at a time might modify our backup.
216    #[cfg(feature = "e2e-encryption")]
217    pub(crate) backup_modify_lock: Mutex<()>,
218
219    /// Lock ensuring that we're going to attempt to upload backups for a single
220    /// requester.
221    #[cfg(feature = "e2e-encryption")]
222    pub(crate) backup_upload_lock: Mutex<()>,
223
224    /// Handler making sure we only have one group session sharing request in
225    /// flight per room.
226    #[cfg(feature = "e2e-encryption")]
227    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
228
229    /// Lock making sure we're only doing one key claim request at a time.
230    #[cfg(feature = "e2e-encryption")]
231    pub(crate) key_claim_lock: Mutex<()>,
232
233    /// Handler to ensure that only one members request is running at a time,
234    /// given a room.
235    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
236
237    /// Handler to ensure that only one encryption state request is running at a
238    /// time, given a room.
239    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
240
241    /// Deduplicating handler for sending read receipts. The string is an
242    /// internal implementation detail, see [`Self::send_single_receipt`].
243    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
244
245    #[cfg(feature = "e2e-encryption")]
246    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
247
248    /// Latest "generation" of data known by the crypto store.
249    ///
250    /// This is a counter that only increments, set in the database (and can
251    /// wrap). It's incremented whenever some process acquires a lock for the
252    /// first time. *This assumes the crypto store lock is being held, to
253    /// avoid data races on writing to this value in the store*.
254    ///
255    /// The current process will maintain this value in local memory and in the
256    /// DB over time. Observing a different value than the one read in
257    /// memory, when reading from the store indicates that somebody else has
258    /// written into the database under our feet.
259    ///
260    /// TODO: this should live in the `OlmMachine`, since it's information
261    /// related to the lock. As of today (2023-07-28), we blow up the entire
262    /// olm machine when there's a generation mismatch. So storing the
263    /// generation in the olm machine would make the client think there's
264    /// *always* a mismatch, and that's why we need to store the generation
265    /// outside the `OlmMachine`.
266    #[cfg(feature = "e2e-encryption")]
267    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
268}
269
270pub(crate) struct ClientInner {
271    /// All the data related to authentication and authorization.
272    pub(crate) auth_ctx: Arc<AuthCtx>,
273
274    /// The URL of the server.
275    ///
276    /// Not to be confused with the `Self::homeserver`. `server` is usually
277    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
278    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
279    /// homeserver (at the time of writing — 2024-08-28).
280    ///
281    /// This value is optional depending on how the `Client` has been built.
282    /// If it's been built from a homeserver URL directly, we don't know the
283    /// server. However, if the `Client` has been built from a server URL or
284    /// name, then the homeserver has been discovered, and we know both.
285    server: Option<Url>,
286
287    /// The URL of the homeserver to connect to.
288    ///
289    /// This is the URL for the client-server Matrix API.
290    homeserver: StdRwLock<Url>,
291
292    /// The sliding sync version.
293    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
294
295    /// The underlying HTTP client.
296    pub(crate) http_client: HttpClient,
297
298    /// User session data.
299    pub(super) base_client: BaseClient,
300
301    /// Collection of in-memory caches for the [`Client`].
302    pub(crate) caches: ClientCaches,
303
304    /// Collection of locks individual client methods might want to use, either
305    /// to ensure that only a single call to a method happens at once or to
306    /// deduplicate multiple calls to a method.
307    pub(crate) locks: ClientLocks,
308
309    /// The cross-process lock configuration.
310    ///
311    /// The SDK provides cross-process store locks (see
312    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
313    /// [`CrossProcessLockConfig::MultiProcess`] is used.
314    ///
315    /// If multiple `Client`s are running in different processes, this
316    /// value MUST be different for each `Client`.
317    cross_process_lock_config: CrossProcessLockConfig,
318
319    /// A mapping of the times at which the current user sent typing notices,
320    /// keyed by room.
321    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
322
323    /// Event handlers. See `add_event_handler`.
324    pub(crate) event_handlers: EventHandlerStore,
325
326    /// Notification handlers. See `register_notification_handler`.
327    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
328
329    /// The sender-side of channels used to receive room updates.
330    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
331
332    /// The sender-side of a channel used to observe all the room updates of a
333    /// sync response.
334    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
335
336    /// Whether the client should update its homeserver URL with the discovery
337    /// information present in the login response.
338    respect_login_well_known: bool,
339
340    /// An event that can be listened on to wait for a successful sync. The
341    /// event will only be fired if a sync loop is running. Can be used for
342    /// synchronization, e.g. if we send out a request to create a room, we can
343    /// wait for the sync to get the data to fetch a room object from the state
344    /// store.
345    pub(crate) sync_beat: event_listener::Event,
346
347    /// A central cache for events, inactive first.
348    ///
349    /// It becomes active when [`EventCache::subscribe`] is called.
350    pub(crate) event_cache: OnceCell<EventCache>,
351
352    /// End-to-end encryption related state.
353    #[cfg(feature = "e2e-encryption")]
354    pub(crate) e2ee: EncryptionData,
355
356    /// The verification state of our own device.
357    #[cfg(feature = "e2e-encryption")]
358    pub(crate) verification_state: SharedObservable<VerificationState>,
359
360    /// Whether to enable the experimental support for sending and receiving
361    /// encrypted room history on invite, per [MSC4268].
362    ///
363    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
364    #[cfg(feature = "e2e-encryption")]
365    pub(crate) enable_share_history_on_invite: bool,
366
367    /// Data related to the [`SendQueue`].
368    ///
369    /// [`SendQueue`]: crate::send_queue::SendQueue
370    pub(crate) send_queue_data: Arc<SendQueueData>,
371
372    /// The `max_upload_size` value of the homeserver, it contains the max
373    /// request size you can send.
374    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
375
376    /// The entry point to get the [`LatestEvent`] of rooms and threads.
377    ///
378    /// [`LatestEvent`]: crate::latest_event::LatestEvent
379    latest_events: OnceCell<LatestEvents>,
380
381    /// Service handling the catching up of thread subscriptions in the
382    /// background.
383    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
384
385    #[cfg(feature = "experimental-search")]
386    /// Handler for [`RoomIndex`]'s of each room
387    search_index: SearchIndex,
388
389    /// A monitor for background tasks spawned by the client.
390    pub(crate) task_monitor: TaskMonitor,
391
392    /// A sender to notify subscribers about duplicate key upload errors
393    /// triggered by requests to /keys/upload.
394    #[cfg(feature = "e2e-encryption")]
395    pub(crate) duplicate_key_upload_error_sender:
396        broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
397}
398
399impl ClientInner {
400    /// Create a new `ClientInner`.
401    ///
402    /// All the fields passed as parameters here are those that must be cloned
403    /// upon instantiation of a sub-client, e.g. a client specialized for
404    /// notifications.
405    #[allow(clippy::too_many_arguments)]
406    async fn new(
407        auth_ctx: Arc<AuthCtx>,
408        server: Option<Url>,
409        homeserver: Url,
410        sliding_sync_version: SlidingSyncVersion,
411        http_client: HttpClient,
412        base_client: BaseClient,
413        supported_versions: CachedValue<TtlValue<SupportedVersions>>,
414        well_known: CachedValue<TtlValue<Option<WellKnownResponse>>>,
415        respect_login_well_known: bool,
416        event_cache: OnceCell<EventCache>,
417        send_queue: Arc<SendQueueData>,
418        latest_events: OnceCell<LatestEvents>,
419        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
420        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
421        cross_process_lock_config: CrossProcessLockConfig,
422        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
423        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
424    ) -> Arc<Self> {
425        let caches = ClientCaches {
426            supported_versions: Cache::with_value(supported_versions),
427            well_known: Cache::with_value(well_known),
428            server_metadata: Cache::new(),
429            homeserver_capabilities: Cache::new(),
430        };
431
432        let client = Self {
433            server,
434            homeserver: StdRwLock::new(homeserver),
435            auth_ctx,
436            sliding_sync_version: StdRwLock::new(sliding_sync_version),
437            http_client,
438            base_client,
439            caches,
440            locks: Default::default(),
441            cross_process_lock_config,
442            typing_notice_times: Default::default(),
443            event_handlers: Default::default(),
444            notification_handlers: Default::default(),
445            room_update_channels: Default::default(),
446            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
447            // ballast for all observers to catch up.
448            room_updates_sender: broadcast::Sender::new(32),
449            respect_login_well_known,
450            sync_beat: event_listener::Event::new(),
451            event_cache,
452            send_queue_data: send_queue,
453            latest_events,
454            #[cfg(feature = "e2e-encryption")]
455            e2ee: EncryptionData::new(encryption_settings),
456            #[cfg(feature = "e2e-encryption")]
457            verification_state: SharedObservable::new(VerificationState::Unknown),
458            #[cfg(feature = "e2e-encryption")]
459            enable_share_history_on_invite,
460            server_max_upload_size: Mutex::new(OnceCell::new()),
461            #[cfg(feature = "experimental-search")]
462            search_index: search_index_handler,
463            thread_subscription_catchup,
464            task_monitor: TaskMonitor::new(),
465            #[cfg(feature = "e2e-encryption")]
466            duplicate_key_upload_error_sender: broadcast::channel(1).0,
467        };
468
469        #[allow(clippy::let_and_return)]
470        let client = Arc::new(client);
471
472        #[cfg(feature = "e2e-encryption")]
473        client.e2ee.initialize_tasks(&client);
474
475        let _ = client
476            .event_cache
477            .get_or_init(|| async {
478                EventCache::new(&client, client.base_client.event_cache_store().clone())
479            })
480            .await;
481
482        let _ = client
483            .thread_subscription_catchup
484            .get_or_init(|| async {
485                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
486            })
487            .await;
488
489        client
490    }
491}
492
493#[cfg(not(tarpaulin_include))]
494impl Debug for Client {
495    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
496        write!(fmt, "Client")
497    }
498}
499
500impl Client {
501    /// Create a new [`Client`] that will use the given homeserver.
502    ///
503    /// # Arguments
504    ///
505    /// * `homeserver_url` - The homeserver that the client should connect to.
506    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
507        Self::builder().homeserver_url(homeserver_url).build().await
508    }
509
510    /// Returns a subscriber that publishes an event every time the ignore user
511    /// list changes.
512    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
513        self.inner.base_client.subscribe_to_ignore_user_list_changes()
514    }
515
516    /// Create a new [`ClientBuilder`].
517    pub fn builder() -> ClientBuilder {
518        ClientBuilder::new()
519    }
520
521    pub(crate) fn base_client(&self) -> &BaseClient {
522        &self.inner.base_client
523    }
524
525    /// The underlying HTTP client.
526    pub fn http_client(&self) -> &reqwest::Client {
527        &self.inner.http_client.inner
528    }
529
530    pub(crate) fn locks(&self) -> &ClientLocks {
531        &self.inner.locks
532    }
533
534    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
535        &self.inner.auth_ctx
536    }
537
538    /// The cross-process store lock configuration used by this [`Client`].
539    ///
540    /// The SDK provides cross-process store locks (see
541    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
542    /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
543    /// the value used for all cross-process store locks used by this
544    /// `Client`.
545    pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
546        &self.inner.cross_process_lock_config
547    }
548
549    /// Change the homeserver URL used by this client.
550    ///
551    /// # Arguments
552    ///
553    /// * `homeserver_url` - The new URL to use.
554    fn set_homeserver(&self, homeserver_url: Url) {
555        *self.inner.homeserver.write().unwrap() = homeserver_url;
556    }
557
558    /// Change to a different homeserver and re-resolve well-known.
559    #[cfg(feature = "e2e-encryption")]
560    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
561        &self,
562        homeserver_url: Url,
563    ) -> Result<()> {
564        self.set_homeserver(homeserver_url);
565        self.reset_well_known().await?;
566        if let Some(well_known) = self.well_known().await {
567            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
568        }
569        Ok(())
570    }
571
572    /// Retrieves a helper component to access the [`HomeserverCapabilities`]
573    /// supported or disabled by the homeserver.
574    pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
575        HomeserverCapabilities::new(self.clone())
576    }
577
578    /// Get the server vendor information from the federation API.
579    ///
580    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
581    /// both the server's software name and version.
582    ///
583    /// # Examples
584    ///
585    /// ```no_run
586    /// # use matrix_sdk::Client;
587    /// # use url::Url;
588    /// # async {
589    /// # let homeserver = Url::parse("http://example.com")?;
590    /// let client = Client::new(homeserver).await?;
591    ///
592    /// let server_info = client.server_vendor_info(None).await?;
593    /// println!(
594    ///     "Server: {}, Version: {}",
595    ///     server_info.server_name, server_info.version
596    /// );
597    /// # anyhow::Ok(()) };
598    /// ```
599    #[cfg(feature = "federation-api")]
600    pub async fn server_vendor_info(
601        &self,
602        request_config: Option<RequestConfig>,
603    ) -> HttpResult<ServerVendorInfo> {
604        use ruma::api::federation::discovery::get_server_version;
605
606        let res = self
607            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
608            .await?;
609
610        // Extract server info, using defaults if fields are missing.
611        let server = res.server.unwrap_or_default();
612        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
613        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
614
615        Ok(ServerVendorInfo { server_name: server_name_str, version })
616    }
617
618    /// Get a copy of the default request config.
619    ///
620    /// The default request config is what's used when sending requests if no
621    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
622    /// function with such a parameter.
623    ///
624    /// If the default request config was not customized through
625    /// [`ClientBuilder`] when creating this `Client`, the returned value will
626    /// be equivalent to [`RequestConfig::default()`].
627    pub fn request_config(&self) -> RequestConfig {
628        self.inner.http_client.request_config
629    }
630
631    /// Check whether the client has been activated.
632    ///
633    /// A client is considered active when:
634    ///
635    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
636    ///    is logged in,
637    /// 2. Has loaded cached data from storage,
638    /// 3. If encryption is enabled, it also initialized or restored its
639    ///    `OlmMachine`.
640    pub fn is_active(&self) -> bool {
641        self.inner.base_client.is_active()
642    }
643
644    /// The server used by the client.
645    ///
646    /// See `Self::server` to learn more.
647    pub fn server(&self) -> Option<&Url> {
648        self.inner.server.as_ref()
649    }
650
651    /// The homeserver of the client.
652    pub fn homeserver(&self) -> Url {
653        self.inner.homeserver.read().unwrap().clone()
654    }
655
656    /// Get the sliding sync version.
657    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
658        self.inner.sliding_sync_version.read().unwrap().clone()
659    }
660
661    /// Override the sliding sync version.
662    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
663        let mut lock = self.inner.sliding_sync_version.write().unwrap();
664        *lock = version;
665    }
666
667    /// Get the Matrix user session meta information.
668    ///
669    /// If the client is currently logged in, this will return a
670    /// [`SessionMeta`] object which contains the user ID and device ID.
671    /// Otherwise it returns `None`.
672    pub fn session_meta(&self) -> Option<&SessionMeta> {
673        self.base_client().session_meta()
674    }
675
676    /// Returns a receiver that gets events for each room info update. To watch
677    /// for new events, use `receiver.resubscribe()`.
678    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
679        self.base_client().room_info_notable_update_receiver()
680    }
681
682    /// Performs a search for users.
683    /// The search is performed case-insensitively on user IDs and display names
684    ///
685    /// # Arguments
686    ///
687    /// * `search_term` - The search term for the search
688    /// * `limit` - The maximum number of results to return. Defaults to 10.
689    ///
690    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
691    pub async fn search_users(
692        &self,
693        search_term: &str,
694        limit: u64,
695    ) -> HttpResult<search_users::v3::Response> {
696        let mut request = search_users::v3::Request::new(search_term.to_owned());
697
698        if let Some(limit) = UInt::new(limit) {
699            request.limit = limit;
700        }
701
702        self.send(request).await
703    }
704
705    /// Get the user id of the current owner of the client.
706    pub fn user_id(&self) -> Option<&UserId> {
707        self.session_meta().map(|s| s.user_id.as_ref())
708    }
709
710    /// Get the device ID that identifies the current session.
711    pub fn device_id(&self) -> Option<&DeviceId> {
712        self.session_meta().map(|s| s.device_id.as_ref())
713    }
714
715    /// Get the current access token for this session.
716    ///
717    /// Will be `None` if the client has not been logged in.
718    pub fn access_token(&self) -> Option<String> {
719        self.auth_ctx().access_token()
720    }
721
722    /// Get the current tokens for this session.
723    ///
724    /// To be notified of changes in the session tokens, use
725    /// [`Client::subscribe_to_session_changes()`] or
726    /// [`Client::set_session_callbacks()`].
727    ///
728    /// Returns `None` if the client has not been logged in.
729    pub fn session_tokens(&self) -> Option<SessionTokens> {
730        self.auth_ctx().session_tokens()
731    }
732
733    /// Access the authentication API used to log in this client.
734    ///
735    /// Will be `None` if the client has not been logged in.
736    pub fn auth_api(&self) -> Option<AuthApi> {
737        match self.auth_ctx().auth_data.get()? {
738            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
739            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
740        }
741    }
742
743    /// Get the whole session info of this client.
744    ///
745    /// Will be `None` if the client has not been logged in.
746    ///
747    /// Can be used with [`Client::restore_session`] to restore a previously
748    /// logged-in session.
749    pub fn session(&self) -> Option<AuthSession> {
750        match self.auth_api()? {
751            AuthApi::Matrix(api) => api.session().map(Into::into),
752            AuthApi::OAuth(api) => api.full_session().map(Into::into),
753        }
754    }
755
756    /// Get a reference to the state store.
757    pub fn state_store(&self) -> &DynStateStore {
758        self.base_client().state_store()
759    }
760
761    /// Get a reference to the event cache store.
762    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
763        self.base_client().event_cache_store()
764    }
765
766    /// Get a reference to the media store.
767    pub fn media_store(&self) -> &MediaStoreLock {
768        self.base_client().media_store()
769    }
770
771    /// Access the native Matrix authentication API with this client.
772    pub fn matrix_auth(&self) -> MatrixAuth {
773        MatrixAuth::new(self.clone())
774    }
775
776    /// Get the account of the current owner of the client.
777    pub fn account(&self) -> Account {
778        Account::new(self.clone())
779    }
780
781    /// Get the encryption manager of the client.
782    #[cfg(feature = "e2e-encryption")]
783    pub fn encryption(&self) -> Encryption {
784        Encryption::new(self.clone())
785    }
786
787    /// Get the media manager of the client.
788    pub fn media(&self) -> Media {
789        Media::new(self.clone())
790    }
791
792    /// Get the pusher manager of the client.
793    pub fn pusher(&self) -> Pusher {
794        Pusher::new(self.clone())
795    }
796
797    /// Access the OAuth 2.0 API of the client.
798    pub fn oauth(&self) -> OAuth {
799        OAuth::new(self.clone())
800    }
801
802    /// Register a handler for a specific event type.
803    ///
804    /// The handler is a function or closure with one or more arguments. The
805    /// first argument is the event itself. All additional arguments are
806    /// "context" arguments: They have to implement [`EventHandlerContext`].
807    /// This trait is named that way because most of the types implementing it
808    /// give additional context about an event: The room it was in, its raw form
809    /// and other similar things. As two exceptions to this,
810    /// [`Client`] and [`EventHandlerHandle`] also implement the
811    /// `EventHandlerContext` trait so you don't have to clone your client
812    /// into the event handler manually and a handler can decide to remove
813    /// itself.
814    ///
815    /// Some context arguments are not universally applicable. A context
816    /// argument that isn't available for the given event type will result in
817    /// the event handler being skipped and an error being logged. The following
818    /// context argument types are only available for a subset of event types:
819    ///
820    /// * [`Room`] is only available for room-specific events, i.e. not for
821    ///   events like global account data events or presence events.
822    ///
823    /// You can provide custom context via
824    /// [`add_event_handler_context`](Client::add_event_handler_context) and
825    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
826    /// into the event handler.
827    ///
828    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
829    ///
830    /// # Examples
831    ///
832    /// ```no_run
833    /// use matrix_sdk::{
834    ///     deserialized_responses::EncryptionInfo,
835    ///     event_handler::Ctx,
836    ///     ruma::{
837    ///         events::{
838    ///             macros::EventContent,
839    ///             push_rules::PushRulesEvent,
840    ///             room::{
841    ///                 message::SyncRoomMessageEvent,
842    ///                 topic::SyncRoomTopicEvent,
843    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
844    ///             },
845    ///         },
846    ///         push::Action,
847    ///         Int, MilliSecondsSinceUnixEpoch,
848    ///     },
849    ///     Client, Room,
850    /// };
851    /// use serde::{Deserialize, Serialize};
852    ///
853    /// # async fn example(client: Client) {
854    /// client.add_event_handler(
855    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
856    ///         // Common usage: Room event plus room and client.
857    ///     },
858    /// );
859    /// client.add_event_handler(
860    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
861    ///         async move {
862    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
863    ///             // unencrypted events and events that were decrypted by the SDK.
864    ///         }
865    ///     },
866    /// );
867    /// client.add_event_handler(
868    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
869    ///         async move {
870    ///             // A `Vec<Action>` parameter allows you to know which push actions
871    ///             // are applicable for an event. For example, an event with
872    ///             // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
873    ///             // should be highlighted in the timeline.
874    ///         }
875    ///     },
876    /// );
877    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
878    ///     // You can omit any or all arguments after the first.
879    /// });
880    ///
881    /// // Registering a temporary event handler:
882    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
883    ///     /* Event handler */
884    /// });
885    /// client.remove_event_handler(handle);
886    ///
887    /// // Registering custom event handler context:
888    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
889    /// struct MyContext {
890    ///     number: usize,
891    /// }
892    /// client.add_event_handler_context(MyContext { number: 5 });
893    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
894    ///     // Use the context
895    /// });
896    ///
897    /// // This will handle membership events in joined rooms. Invites are special, see below.
898    /// client.add_event_handler(
899    ///     |ev: SyncRoomMemberEvent| async move {},
900    /// );
901    ///
902    /// // To handle state events in invited rooms (including invite membership events),
903    /// // `StrippedRoomMemberEvent` should be used.
904    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
905    /// client.add_event_handler(
906    ///     |ev: StrippedRoomMemberEvent| async move {},
907    /// );
908    ///
909    /// // Custom events work exactly the same way, you just need to declare
910    /// // the content struct and use the EventContent derive macro on it.
911    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
912    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
913    /// struct TokenEventContent {
914    ///     token: String,
915    ///     #[serde(rename = "exp")]
916    ///     expires_at: MilliSecondsSinceUnixEpoch,
917    /// }
918    ///
919    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
920    ///     todo!("Display the token");
921    /// });
922    ///
923    /// // Event handler closures can also capture local variables.
924    /// // Make sure they are cheap to clone though, because they will be cloned
925    /// // every time the closure is called.
926    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
927    ///
928    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
929    ///     println!("Calling the handler with identifier {data}");
930    /// });
931    /// # }
932    /// ```
933    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
934    where
935        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
936        H: EventHandler<Ev, Ctx>,
937    {
938        self.add_event_handler_impl(handler, None)
939    }
940
941    /// Register a handler for a specific room, and event type.
942    ///
943    /// This method works the same way as
944    /// [`add_event_handler`][Self::add_event_handler], except that the handler
945    /// will only be called for events in the room with the specified ID. See
946    /// that method for more details on event handler functions.
947    ///
948    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
949    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
950    /// your use case.
951    pub fn add_room_event_handler<Ev, Ctx, H>(
952        &self,
953        room_id: &RoomId,
954        handler: H,
955    ) -> EventHandlerHandle
956    where
957        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
958        H: EventHandler<Ev, Ctx>,
959    {
960        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
961    }
962
963    /// Observe a specific event type.
964    ///
965    /// `Ev` represents the kind of event that will be observed. `Ctx`
966    /// represents the context that will come with the event. It relies on the
967    /// same mechanism as [`Client::add_event_handler`]. The main difference is
968    /// that it returns an [`ObservableEventHandler`] and doesn't require a
969    /// user-defined closure. It is possible to subscribe to the
970    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
971    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
972    /// Ctx)`.
973    ///
974    /// Be careful that only the most recent value can be observed. Subscribers
975    /// are notified when a new value is sent, but there is no guarantee
976    /// that they will see all values.
977    ///
978    /// # Example
979    ///
980    /// Let's see a classical usage:
981    ///
982    /// ```
983    /// use futures_util::StreamExt as _;
984    /// use matrix_sdk::{
985    ///     Client, Room,
986    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
987    /// };
988    ///
989    /// # async fn example(client: Client) -> Option<()> {
990    /// let observer =
991    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
992    ///
993    /// let mut subscriber = observer.subscribe();
994    ///
995    /// let (event, (room, push_actions)) = subscriber.next().await?;
996    /// # Some(())
997    /// # }
998    /// ```
999    ///
1000    /// Now let's see how to get several contexts that can be useful for you:
1001    ///
1002    /// ```
1003    /// use matrix_sdk::{
1004    ///     Client, Room,
1005    ///     deserialized_responses::EncryptionInfo,
1006    ///     ruma::{
1007    ///         events::room::{
1008    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1009    ///         },
1010    ///         push::Action,
1011    ///     },
1012    /// };
1013    ///
1014    /// # async fn example(client: Client) {
1015    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1016    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1017    ///
1018    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1019    /// // to distinguish between unencrypted events and events that were decrypted
1020    /// // by the SDK.
1021    /// let _ = client
1022    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1023    ///     );
1024    ///
1025    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1026    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1027    /// // should be highlighted in the timeline.
1028    /// let _ =
1029    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1030    ///
1031    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1032    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1033    /// # }
1034    /// ```
1035    ///
1036    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1037    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1038    where
1039        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1040        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1041    {
1042        self.observe_room_events_impl(None)
1043    }
1044
1045    /// Observe a specific room, and event type.
1046    ///
1047    /// This method works the same way as [`Client::observe_events`], except
1048    /// that the observability will only be applied for events in the room with
1049    /// the specified ID. See that method for more details.
1050    ///
1051    /// Be careful that only the most recent value can be observed. Subscribers
1052    /// are notified when a new value is sent, but there is no guarantee
1053    /// that they will see all values.
1054    pub fn observe_room_events<Ev, Ctx>(
1055        &self,
1056        room_id: &RoomId,
1057    ) -> ObservableEventHandler<(Ev, Ctx)>
1058    where
1059        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1060        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1061    {
1062        self.observe_room_events_impl(Some(room_id.to_owned()))
1063    }
1064
1065    /// Shared implementation for `Client::observe_events` and
1066    /// `Client::observe_room_events`.
1067    fn observe_room_events_impl<Ev, Ctx>(
1068        &self,
1069        room_id: Option<OwnedRoomId>,
1070    ) -> ObservableEventHandler<(Ev, Ctx)>
1071    where
1072        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1073        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1074    {
1075        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1076        // new value.
1077        let shared_observable = SharedObservable::new(None);
1078
1079        ObservableEventHandler::new(
1080            shared_observable.clone(),
1081            self.event_handler_drop_guard(self.add_event_handler_impl(
1082                move |event: Ev, context: Ctx| {
1083                    shared_observable.set(Some((event, context)));
1084
1085                    ready(())
1086                },
1087                room_id,
1088            )),
1089        )
1090    }
1091
1092    /// Subscribe to future `beacon_info` updates for the current user across
1093    /// all rooms.
1094    ///
1095    /// This stream is push-only: it emits only future updates observed during
1096    /// sync processing and does not replay existing state.
1097    pub fn observe_own_beacon_info_updates(
1098        &self,
1099    ) -> Result<impl Stream<Item = BeaconInfoUpdate> + use<>> {
1100        let observer = self.observe_events::<OriginalSyncBeaconInfoEvent, Room>();
1101        let mut stream = observer.subscribe();
1102        let own_user_id = self.user_id().ok_or(Error::AuthenticationRequired)?.to_owned();
1103        Ok(async_stream::stream! {
1104            let _observer = observer;
1105
1106            while let Some((event, room)) = stream.next().await {
1107                if event.state_key != own_user_id {
1108                    continue;
1109                }
1110                yield BeaconInfoUpdate {
1111                    room_id: room.room_id().to_owned(),
1112                    event_id: event.event_id,
1113                    content: event.content,
1114                };
1115            }
1116        })
1117    }
1118
1119    /// Remove the event handler associated with the handle.
1120    ///
1121    /// Note that you **must not** call `remove_event_handler` from the
1122    /// non-async part of an event handler, that is:
1123    ///
1124    /// ```ignore
1125    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1126    ///     // ⚠ this will cause a deadlock ⚠
1127    ///     client.remove_event_handler(handle);
1128    ///
1129    ///     async move {
1130    ///         // removing the event handler here is fine
1131    ///         client.remove_event_handler(handle);
1132    ///     }
1133    /// })
1134    /// ```
1135    ///
1136    /// Note also that handlers that remove themselves will still execute with
1137    /// events received in the same sync cycle.
1138    ///
1139    /// # Arguments
1140    ///
1141    /// `handle` - The [`EventHandlerHandle`] that is returned when
1142    /// registering the event handler with [`Client::add_event_handler`].
1143    ///
1144    /// # Examples
1145    ///
1146    /// ```no_run
1147    /// # use url::Url;
1148    /// # use tokio::sync::mpsc;
1149    /// #
1150    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1151    /// #
1152    /// use matrix_sdk::{
1153    ///     Client, event_handler::EventHandlerHandle,
1154    ///     ruma::events::room::member::SyncRoomMemberEvent,
1155    /// };
1156    /// #
1157    /// # futures_executor::block_on(async {
1158    /// # let client = matrix_sdk::Client::builder()
1159    /// #     .homeserver_url(homeserver)
1160    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1161    /// #     .build()
1162    /// #     .await
1163    /// #     .unwrap();
1164    ///
1165    /// client.add_event_handler(
1166    ///     |ev: SyncRoomMemberEvent,
1167    ///      client: Client,
1168    ///      handle: EventHandlerHandle| async move {
1169    ///         // Common usage: Check arriving Event is the expected one
1170    ///         println!("Expected RoomMemberEvent received!");
1171    ///         client.remove_event_handler(handle);
1172    ///     },
1173    /// );
1174    /// # });
1175    /// ```
1176    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1177        self.inner.event_handlers.remove(handle);
1178    }
1179
1180    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1181    /// the given handle.
1182    ///
1183    /// When the returned value is dropped, the event handler will be removed.
1184    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1185        EventHandlerDropGuard::new(handle, self.clone())
1186    }
1187
1188    /// Add an arbitrary value for use as event handler context.
1189    ///
1190    /// The value can be obtained in an event handler by adding an argument of
1191    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1192    ///
1193    /// If a value of the same type has been added before, it will be
1194    /// overwritten.
1195    ///
1196    /// # Examples
1197    ///
1198    /// ```no_run
1199    /// use matrix_sdk::{
1200    ///     Room, event_handler::Ctx,
1201    ///     ruma::events::room::message::SyncRoomMessageEvent,
1202    /// };
1203    /// # #[derive(Clone)]
1204    /// # struct SomeType;
1205    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1206    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1207    /// # futures_executor::block_on(async {
1208    /// # let client = matrix_sdk::Client::builder()
1209    /// #     .homeserver_url(homeserver)
1210    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1211    /// #     .build()
1212    /// #     .await
1213    /// #     .unwrap();
1214    ///
1215    /// // Handle used to send messages to the UI part of the app
1216    /// let my_gui_handle: SomeType = obtain_gui_handle();
1217    ///
1218    /// client.add_event_handler_context(my_gui_handle.clone());
1219    /// client.add_event_handler(
1220    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1221    ///         async move {
1222    ///             // gui_handle.send(DisplayMessage { message: ev });
1223    ///         }
1224    ///     },
1225    /// );
1226    /// # });
1227    /// ```
1228    pub fn add_event_handler_context<T>(&self, ctx: T)
1229    where
1230        T: Clone + Send + Sync + 'static,
1231    {
1232        self.inner.event_handlers.add_context(ctx);
1233    }
1234
1235    /// Register a handler for a notification.
1236    ///
1237    /// Similar to [`Client::add_event_handler`], but only allows functions
1238    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1239    /// [`Client`] for now.
1240    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1241    where
1242        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1243        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1244    {
1245        self.inner.notification_handlers.write().await.push(Box::new(
1246            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1247        ));
1248
1249        self
1250    }
1251
1252    /// Subscribe to all updates for the room with the given ID.
1253    ///
1254    /// The returned receiver will receive a new message for each sync response
1255    /// that contains updates for that room.
1256    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1257        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1258            btree_map::Entry::Vacant(entry) => {
1259                let (tx, rx) = broadcast::channel(8);
1260                entry.insert(tx);
1261                rx
1262            }
1263            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1264        }
1265    }
1266
1267    /// Subscribe to all updates to all rooms, whenever any has been received in
1268    /// a sync response.
1269    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1270        self.inner.room_updates_sender.subscribe()
1271    }
1272
1273    pub(crate) async fn notification_handlers(
1274        &self,
1275    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1276        self.inner.notification_handlers.read().await
1277    }
1278
1279    /// Get all the rooms the client knows about.
1280    ///
1281    /// This will return the list of joined, invited, and left rooms.
1282    pub fn rooms(&self) -> Vec<Room> {
1283        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1284    }
1285
1286    /// Get all the rooms the client knows about, filtered by room state.
1287    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1288        self.base_client()
1289            .rooms_filtered(filter)
1290            .into_iter()
1291            .map(|room| Room::new(self.clone(), room))
1292            .collect()
1293    }
1294
1295    /// Get a stream of all the rooms, in addition to the existing rooms.
1296    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1297        let (rooms, stream) = self.base_client().rooms_stream();
1298
1299        let map_room = |room| Room::new(self.clone(), room);
1300
1301        (
1302            rooms.into_iter().map(map_room).collect(),
1303            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1304        )
1305    }
1306
1307    /// Returns the joined rooms this client knows about.
1308    pub fn joined_rooms(&self) -> Vec<Room> {
1309        self.rooms_filtered(RoomStateFilter::JOINED)
1310    }
1311
1312    /// Returns the invited rooms this client knows about.
1313    pub fn invited_rooms(&self) -> Vec<Room> {
1314        self.rooms_filtered(RoomStateFilter::INVITED)
1315    }
1316
1317    /// Returns the left rooms this client knows about.
1318    pub fn left_rooms(&self) -> Vec<Room> {
1319        self.rooms_filtered(RoomStateFilter::LEFT)
1320    }
1321
1322    /// Returns the joined space rooms this client knows about.
1323    pub fn joined_space_rooms(&self) -> Vec<Room> {
1324        self.base_client()
1325            .rooms_filtered(RoomStateFilter::JOINED)
1326            .into_iter()
1327            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1328            .collect()
1329    }
1330
1331    /// Get a room with the given room id.
1332    ///
1333    /// # Arguments
1334    ///
1335    /// `room_id` - The unique id of the room that should be fetched.
1336    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1337        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1338    }
1339
1340    /// Gets the preview of a room, whether the current user has joined it or
1341    /// not.
1342    pub async fn get_room_preview(
1343        &self,
1344        room_or_alias_id: &RoomOrAliasId,
1345        via: Vec<OwnedServerName>,
1346    ) -> Result<RoomPreview> {
1347        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1348            Ok(room_id) => room_id.to_owned(),
1349            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1350        };
1351
1352        if let Some(room) = self.get_room(&room_id) {
1353            // The cached data can only be trusted if the room state is joined or
1354            // banned: for invite and knock rooms, no updates will be received
1355            // for the rooms after the invite/knock action took place so we may
1356            // have very out to date data for important fields such as
1357            // `join_rule`. For left rooms, the homeserver should return the latest info.
1358            match room.state() {
1359                RoomState::Joined | RoomState::Banned => {
1360                    return Ok(RoomPreview::from_known_room(&room).await);
1361                }
1362                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1363            }
1364        }
1365
1366        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1367    }
1368
1369    /// Resolve a room alias to a room id and a list of servers which know
1370    /// about it.
1371    ///
1372    /// # Arguments
1373    ///
1374    /// `room_alias` - The room alias to be resolved.
1375    pub async fn resolve_room_alias(
1376        &self,
1377        room_alias: &RoomAliasId,
1378    ) -> HttpResult<get_alias::v3::Response> {
1379        let request = get_alias::v3::Request::new(room_alias.to_owned());
1380        self.send(request).await
1381    }
1382
1383    /// Checks if a room alias is not in use yet.
1384    ///
1385    /// Returns:
1386    /// - `Ok(true)` if the room alias is available.
1387    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1388    ///   status code).
1389    /// - An `Err` otherwise.
1390    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1391        match self.resolve_room_alias(alias).await {
1392            // The room alias was resolved, so it's already in use.
1393            Ok(_) => Ok(false),
1394            Err(error) => {
1395                match error.client_api_error_kind() {
1396                    // The room alias wasn't found, so it's available.
1397                    Some(ErrorKind::NotFound) => Ok(true),
1398                    _ => Err(error),
1399                }
1400            }
1401        }
1402    }
1403
1404    /// Adds a new room alias associated with a room to the room directory.
1405    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1406        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1407        self.send(request).await?;
1408        Ok(())
1409    }
1410
1411    /// Removes a room alias from the room directory.
1412    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1413        let request = delete_alias::v3::Request::new(alias.to_owned());
1414        self.send(request).await?;
1415        Ok(())
1416    }
1417
1418    /// Update the homeserver from the login response well-known if needed.
1419    ///
1420    /// # Arguments
1421    ///
1422    /// * `login_well_known` - The `well_known` field from a successful login
1423    ///   response.
1424    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1425        if self.inner.respect_login_well_known
1426            && let Some(well_known) = login_well_known
1427            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1428        {
1429            self.set_homeserver(homeserver);
1430        }
1431    }
1432
1433    /// Similar to [`Client::restore_session_with`], with
1434    /// [`RoomLoadSettings::default()`].
1435    ///
1436    /// # Panics
1437    ///
1438    /// Panics if a session was already restored or logged in.
1439    #[instrument(skip_all)]
1440    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1441        self.restore_session_with(session, RoomLoadSettings::default()).await
1442    }
1443
1444    /// Restore a session previously logged-in using one of the available
1445    /// authentication APIs. The number of rooms to restore is controlled by
1446    /// [`RoomLoadSettings`].
1447    ///
1448    /// See the documentation of the corresponding authentication API's
1449    /// `restore_session` method for more information.
1450    ///
1451    /// # Panics
1452    ///
1453    /// Panics if a session was already restored or logged in.
1454    #[instrument(skip_all)]
1455    pub async fn restore_session_with(
1456        &self,
1457        session: impl Into<AuthSession>,
1458        room_load_settings: RoomLoadSettings,
1459    ) -> Result<()> {
1460        let session = session.into();
1461        match session {
1462            AuthSession::Matrix(session) => {
1463                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1464            }
1465            AuthSession::OAuth(session) => {
1466                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1467            }
1468        }
1469    }
1470
1471    /// Refresh the access token using the authentication API used to log into
1472    /// this session.
1473    ///
1474    /// See the documentation of the authentication API's `refresh_access_token`
1475    /// method for more information.
1476    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1477        let Some(auth_api) = self.auth_api() else {
1478            return Err(RefreshTokenError::RefreshTokenRequired);
1479        };
1480
1481        match auth_api {
1482            AuthApi::Matrix(api) => {
1483                trace!("Token refresh: Using the homeserver.");
1484                Box::pin(api.refresh_access_token()).await?;
1485            }
1486            AuthApi::OAuth(api) => {
1487                trace!("Token refresh: Using OAuth 2.0.");
1488                Box::pin(api.refresh_access_token()).await?;
1489            }
1490        }
1491
1492        Ok(())
1493    }
1494
1495    /// Log out the current session using the proper authentication API.
1496    ///
1497    /// # Errors
1498    ///
1499    /// Returns an error if the session is not authenticated or if an error
1500    /// occurred while making the request to the server.
1501    pub async fn logout(&self) -> Result<(), Error> {
1502        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1503        match auth_api {
1504            AuthApi::Matrix(matrix_auth) => {
1505                matrix_auth.logout().await?;
1506                Ok(())
1507            }
1508            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1509        }
1510    }
1511
1512    /// Get or upload a sync filter.
1513    ///
1514    /// This method will either get a filter ID from the store or upload the
1515    /// filter definition to the homeserver and return the new filter ID.
1516    ///
1517    /// # Arguments
1518    ///
1519    /// * `filter_name` - The unique name of the filter, this name will be used
1520    /// locally to store and identify the filter ID returned by the server.
1521    ///
1522    /// * `definition` - The filter definition that should be uploaded to the
1523    /// server if no filter ID can be found in the store.
1524    ///
1525    /// # Examples
1526    ///
1527    /// ```no_run
1528    /// # use matrix_sdk::{
1529    /// #    Client, config::SyncSettings,
1530    /// #    ruma::api::client::{
1531    /// #        filter::{
1532    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1533    /// #        },
1534    /// #        sync::sync_events::v3::Filter,
1535    /// #    }
1536    /// # };
1537    /// # use url::Url;
1538    /// # async {
1539    /// # let homeserver = Url::parse("http://example.com").unwrap();
1540    /// # let client = Client::new(homeserver).await.unwrap();
1541    /// let mut filter = FilterDefinition::default();
1542    ///
1543    /// // Let's enable member lazy loading.
1544    /// filter.room.state.lazy_load_options =
1545    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1546    ///
1547    /// let filter_id = client
1548    ///     .get_or_upload_filter("sync", filter)
1549    ///     .await
1550    ///     .unwrap();
1551    ///
1552    /// let sync_settings = SyncSettings::new()
1553    ///     .filter(Filter::FilterId(filter_id));
1554    ///
1555    /// let response = client.sync_once(sync_settings).await.unwrap();
1556    /// # };
1557    #[instrument(skip(self, definition))]
1558    pub async fn get_or_upload_filter(
1559        &self,
1560        filter_name: &str,
1561        definition: FilterDefinition,
1562    ) -> Result<String> {
1563        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1564            debug!("Found filter locally");
1565            Ok(filter)
1566        } else {
1567            debug!("Didn't find filter locally");
1568            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1569            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1570            let response = self.send(request).await?;
1571
1572            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1573
1574            Ok(response.filter_id)
1575        }
1576    }
1577
1578    /// Prepare to join a room by ID, by getting the current details about it
1579    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1580        let room = self.get_room(room_id)?;
1581
1582        let inviter = match room.invite_details().await {
1583            Ok(details) => details.inviter,
1584            Err(Error::WrongRoomState(_)) => None,
1585            Err(e) => {
1586                warn!("Error fetching invite details for room: {e:?}");
1587                None
1588            }
1589        };
1590
1591        Some(PreJoinRoomInfo { inviter })
1592    }
1593
1594    /// Finish joining a room.
1595    ///
1596    /// If the room was an invite that should be marked as a DM, will include it
1597    /// in the DM event after creating the joined room.
1598    ///
1599    /// If encrypted history sharing is enabled, will check to see if we have a
1600    /// key bundle, and import it if so.
1601    ///
1602    /// # Arguments
1603    ///
1604    /// * `room_id` - The `RoomId` of the room that was joined.
1605    /// * `pre_join_room_info` - Information about the room before we joined.
1606    async fn finish_join_room(
1607        &self,
1608        room_id: &RoomId,
1609        pre_join_room_info: Option<PreJoinRoomInfo>,
1610    ) -> Result<Room> {
1611        info!(?room_id, ?pre_join_room_info, "Completing room join");
1612        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1613            room.state() == RoomState::Invited
1614                && room.is_direct().await.unwrap_or_else(|e| {
1615                    warn!(%room_id, "is_direct() failed: {e}");
1616                    false
1617                })
1618        } else {
1619            false
1620        };
1621
1622        let base_room = self
1623            .base_client()
1624            .room_joined(
1625                room_id,
1626                pre_join_room_info
1627                    .as_ref()
1628                    .and_then(|info| info.inviter.as_ref())
1629                    .map(|i| i.user_id().to_owned()),
1630            )
1631            .await?;
1632        let room = Room::new(self.clone(), base_room);
1633
1634        if mark_as_dm {
1635            room.set_is_direct(true).await?;
1636        }
1637
1638        // If we joined following an invite, check if we had previously received a key
1639        // bundle from the inviter, and import it if so.
1640        //
1641        // It's important that we only do this once `BaseClient::room_joined` has
1642        // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1643        // race.
1644        #[cfg(feature = "e2e-encryption")]
1645        if self.inner.enable_share_history_on_invite
1646            && let Some(inviter) =
1647                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1648        {
1649            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1650                .await?;
1651        }
1652
1653        // Suppress "unused variable" and "unused field" lints
1654        #[cfg(not(feature = "e2e-encryption"))]
1655        let _ = pre_join_room_info.map(|i| i.inviter);
1656
1657        Ok(room)
1658    }
1659
1660    /// Join a room by `RoomId`.
1661    ///
1662    /// Returns the `Room` in the joined state.
1663    ///
1664    /// # Arguments
1665    ///
1666    /// * `room_id` - The `RoomId` of the room to be joined.
1667    #[instrument(skip(self))]
1668    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1669        // See who invited us to this room, if anyone. Note we have to do this before
1670        // making the `/join` request, otherwise we could race against the sync.
1671        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1672
1673        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1674        let response = self.send(request).await?;
1675        self.finish_join_room(&response.room_id, pre_join_info).await
1676    }
1677
1678    /// Join a room by `RoomOrAliasId`.
1679    ///
1680    /// Returns the `Room` in the joined state.
1681    ///
1682    /// # Arguments
1683    ///
1684    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1685    ///   alias looks like `#name:example.com`.
1686    /// * `server_names` - The server names to be used for resolving the alias,
1687    ///   if needs be.
1688    #[instrument(skip(self))]
1689    pub async fn join_room_by_id_or_alias(
1690        &self,
1691        alias: &RoomOrAliasId,
1692        server_names: &[OwnedServerName],
1693    ) -> Result<Room> {
1694        let pre_join_info = {
1695            match alias.try_into() {
1696                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1697                Err(_) => {
1698                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1699                    // responding to an invitation to the room, and therefore don't need to handle
1700                    // things that happen as a result of invites.
1701                    None
1702                }
1703            }
1704        };
1705        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1706            via: server_names.to_owned(),
1707        });
1708        let response = self.send(request).await?;
1709        self.finish_join_room(&response.room_id, pre_join_info).await
1710    }
1711
1712    /// Search the homeserver's directory of public rooms.
1713    ///
1714    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1715    /// a `get_public_rooms::Response`.
1716    ///
1717    /// # Arguments
1718    ///
1719    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1720    ///
1721    /// * `since` - Pagination token from a previous request.
1722    ///
1723    /// * `server` - The name of the server, if `None` the requested server is
1724    ///   used.
1725    ///
1726    /// # Examples
1727    /// ```no_run
1728    /// use matrix_sdk::Client;
1729    /// # use url::Url;
1730    /// # let homeserver = Url::parse("http://example.com").unwrap();
1731    /// # let limit = Some(10);
1732    /// # let since = Some("since token");
1733    /// # let server = Some("servername.com".try_into().unwrap());
1734    /// # async {
1735    /// let mut client = Client::new(homeserver).await.unwrap();
1736    ///
1737    /// client.public_rooms(limit, since, server).await;
1738    /// # };
1739    /// ```
1740    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1741    pub async fn public_rooms(
1742        &self,
1743        limit: Option<u32>,
1744        since: Option<&str>,
1745        server: Option<&ServerName>,
1746    ) -> HttpResult<get_public_rooms::v3::Response> {
1747        let limit = limit.map(UInt::from);
1748
1749        let request = assign!(get_public_rooms::v3::Request::new(), {
1750            limit,
1751            since: since.map(ToOwned::to_owned),
1752            server: server.map(ToOwned::to_owned),
1753        });
1754        self.send(request).await
1755    }
1756
1757    /// Create a room with the given parameters.
1758    ///
1759    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1760    /// created room.
1761    ///
1762    /// If you want to create a direct message with one specific user, you can
1763    /// use [`create_dm`][Self::create_dm], which is more convenient than
1764    /// assembling the [`create_room::v3::Request`] yourself.
1765    ///
1766    /// If the `is_direct` field of the request is set to `true` and at least
1767    /// one user is invited, the room will be automatically added to the direct
1768    /// rooms in the account data.
1769    ///
1770    /// # Examples
1771    ///
1772    /// ```no_run
1773    /// use matrix_sdk::{
1774    ///     Client,
1775    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1776    /// };
1777    /// # use url::Url;
1778    /// #
1779    /// # async {
1780    /// # let homeserver = Url::parse("http://example.com").unwrap();
1781    /// let request = CreateRoomRequest::new();
1782    /// let client = Client::new(homeserver).await.unwrap();
1783    /// assert!(client.create_room(request).await.is_ok());
1784    /// # };
1785    /// ```
1786    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1787        let invite = request.invite.clone();
1788        let is_direct_room = request.is_direct;
1789        let response = self.send(request).await?;
1790
1791        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1792
1793        let joined_room = Room::new(self.clone(), base_room);
1794
1795        if is_direct_room
1796            && !invite.is_empty()
1797            && let Err(error) =
1798                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1799        {
1800            // FIXME: Retry in the background
1801            error!("Failed to mark room as DM: {error}");
1802        }
1803
1804        Ok(joined_room)
1805    }
1806
1807    /// Create a DM room.
1808    ///
1809    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1810    /// given user being invited, the room marked `is_direct` and both the
1811    /// creator and invitee getting the default maximum power level.
1812    ///
1813    /// If the `e2e-encryption` feature is enabled, the room will also be
1814    /// encrypted.
1815    ///
1816    /// # Arguments
1817    ///
1818    /// * `user_id` - The ID of the user to create a DM for.
1819    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1820        #[cfg(feature = "e2e-encryption")]
1821        let initial_state = vec![
1822            InitialStateEvent::with_empty_state_key(
1823                RoomEncryptionEventContent::with_recommended_defaults(),
1824            )
1825            .to_raw_any(),
1826        ];
1827
1828        #[cfg(not(feature = "e2e-encryption"))]
1829        let initial_state = vec![];
1830
1831        let request = assign!(create_room::v3::Request::new(), {
1832            invite: vec![user_id.to_owned()],
1833            is_direct: true,
1834            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1835            initial_state,
1836        });
1837
1838        self.create_room(request).await
1839    }
1840
1841    /// Get the first existing DM room with the given user, if any.
1842    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1843        self.get_dm_rooms(user_id).next()
1844    }
1845
1846    /// Get an iterator with the existing DM rooms for the given user.
1847    pub fn get_dm_rooms(&self, user_id: &UserId) -> impl Iterator<Item = Room> {
1848        let rooms = self.joined_rooms();
1849
1850        let dm_definition = &self.base_client().dm_room_definition;
1851
1852        // Find the room we share with the `user_id` and only with `user_id`
1853        let rooms = rooms.into_iter().filter(move |r| {
1854            let targets = r.direct_targets();
1855            let targets_match =
1856                targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id));
1857            match dm_definition {
1858                DmRoomDefinition::MatrixSpec => targets_match,
1859                DmRoomDefinition::TwoMembers => {
1860                    let service_members_count =
1861                        r.service_members().map(|s| s.len()).unwrap_or_default() as u64;
1862                    let active_non_service_members =
1863                        r.active_members_count().saturating_sub(service_members_count);
1864                    targets_match && active_non_service_members <= 2
1865                }
1866            }
1867        });
1868
1869        trace!(?user_id, ?rooms, "Found DM rooms with user");
1870        rooms
1871    }
1872
1873    /// Search the homeserver's directory for public rooms with a filter.
1874    ///
1875    /// # Arguments
1876    ///
1877    /// * `room_search` - The easiest way to create this request is using the
1878    ///   `get_public_rooms_filtered::Request` itself.
1879    ///
1880    /// # Examples
1881    ///
1882    /// ```no_run
1883    /// # use url::Url;
1884    /// # use matrix_sdk::Client;
1885    /// # async {
1886    /// # let homeserver = Url::parse("http://example.com")?;
1887    /// use matrix_sdk::ruma::{
1888    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1889    /// };
1890    /// # let mut client = Client::new(homeserver).await?;
1891    ///
1892    /// let mut filter = Filter::new();
1893    /// filter.generic_search_term = Some("rust".to_owned());
1894    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1895    /// request.filter = filter;
1896    ///
1897    /// let response = client.public_rooms_filtered(request).await?;
1898    ///
1899    /// for room in response.chunk {
1900    ///     println!("Found room {room:?}");
1901    /// }
1902    /// # anyhow::Ok(()) };
1903    /// ```
1904    pub async fn public_rooms_filtered(
1905        &self,
1906        request: get_public_rooms_filtered::v3::Request,
1907    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1908        self.send(request).await
1909    }
1910
1911    /// Send an arbitrary request to the server, without updating client state.
1912    ///
1913    /// **Warning:** Because this method *does not* update the client state, it
1914    /// is important to make sure that you account for this yourself, and
1915    /// use wrapper methods where available.  This method should *only* be
1916    /// used if a wrapper method for the endpoint you'd like to use is not
1917    /// available.
1918    ///
1919    /// # Arguments
1920    ///
1921    /// * `request` - A filled out and valid request for the endpoint to be hit
1922    ///
1923    /// * `timeout` - An optional request timeout setting, this overrides the
1924    ///   default request setting if one was set.
1925    ///
1926    /// # Examples
1927    ///
1928    /// ```no_run
1929    /// # use matrix_sdk::{Client, config::SyncSettings};
1930    /// # use url::Url;
1931    /// # async {
1932    /// # let homeserver = Url::parse("http://localhost:8080")?;
1933    /// # let mut client = Client::new(homeserver).await?;
1934    /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1935    ///
1936    /// // First construct the request you want to make
1937    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1938    /// // for all available Endpoints
1939    /// let user_id = owned_user_id!("@example:localhost");
1940    /// let request = profile::get_profile::v3::Request::new(user_id);
1941    ///
1942    /// // Start the request using Client::send()
1943    /// let response = client.send(request).await?;
1944    ///
1945    /// // Check the corresponding Response struct to find out what types are
1946    /// // returned
1947    /// # anyhow::Ok(()) };
1948    /// ```
1949    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1950    where
1951        Request: OutgoingRequest + Clone + Debug,
1952        Request::Authentication: SupportedAuthScheme,
1953        Request::PathBuilder: SupportedPathBuilder,
1954        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1955        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1956    {
1957        SendRequest {
1958            client: self.clone(),
1959            request,
1960            config: None,
1961            send_progress: Default::default(),
1962        }
1963    }
1964
1965    pub(crate) async fn send_inner<Request>(
1966        &self,
1967        request: Request,
1968        config: Option<RequestConfig>,
1969        send_progress: SharedObservable<TransmissionProgress>,
1970    ) -> HttpResult<Request::IncomingResponse>
1971    where
1972        Request: OutgoingRequest + Debug,
1973        Request::Authentication: SupportedAuthScheme,
1974        Request::PathBuilder: SupportedPathBuilder,
1975        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1976        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1977    {
1978        let homeserver = self.homeserver().to_string();
1979        let access_token = self.access_token();
1980        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1981
1982        let path_builder_input =
1983            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1984
1985        let result = self
1986            .inner
1987            .http_client
1988            .send(
1989                request,
1990                config,
1991                homeserver,
1992                access_token.as_deref(),
1993                path_builder_input,
1994                send_progress,
1995            )
1996            .await;
1997
1998        if let Err(Some(ErrorKind::UnknownToken { .. })) =
1999            result.as_ref().map_err(HttpError::client_api_error_kind)
2000            && let Some(access_token) = &access_token
2001        {
2002            // Mark the access token as expired.
2003            self.auth_ctx().set_access_token_expired(access_token);
2004        }
2005
2006        result
2007    }
2008
2009    fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
2010        _ = self
2011            .inner
2012            .auth_ctx
2013            .session_change_sender
2014            .send(SessionChange::UnknownToken(unknown_token_data.clone()));
2015    }
2016
2017    /// Fetches server versions from network; no caching.
2018    pub async fn fetch_server_versions(
2019        &self,
2020        request_config: Option<RequestConfig>,
2021    ) -> HttpResult<get_supported_versions::Response> {
2022        // Since this was called by the user, try to refresh the access token if
2023        // necessary.
2024        self.fetch_server_versions_inner(false, request_config).await
2025    }
2026
2027    /// Fetches server versions from network; no caching.
2028    ///
2029    /// If the access token is expired and `failsafe` is `false`, this will
2030    /// attempt to refresh the access token, otherwise this will try to make an
2031    /// unauthenticated request instead.
2032    pub(crate) async fn fetch_server_versions_inner(
2033        &self,
2034        failsafe: bool,
2035        request_config: Option<RequestConfig>,
2036    ) -> HttpResult<get_supported_versions::Response> {
2037        if !failsafe {
2038            // `Client::send()` handles refreshing access tokens.
2039            return self
2040                .send(get_supported_versions::Request::new())
2041                .with_request_config(request_config)
2042                .await;
2043        }
2044
2045        let homeserver = self.homeserver().to_string();
2046
2047        // If we have a fresh access token, try with it first.
2048        if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2049            && self.auth_ctx().has_valid_access_token()
2050            && let Some(access_token) = self.access_token()
2051        {
2052            let result = self
2053                .inner
2054                .http_client
2055                .send(
2056                    get_supported_versions::Request::new(),
2057                    request_config,
2058                    homeserver.clone(),
2059                    Some(&access_token),
2060                    (),
2061                    Default::default(),
2062                )
2063                .await;
2064
2065            if let Err(Some(ErrorKind::UnknownToken { .. })) =
2066                result.as_ref().map_err(HttpError::client_api_error_kind)
2067            {
2068                // If the access token is actually expired, mark it as expired and fallback to
2069                // the unauthenticated request below.
2070                self.auth_ctx().set_access_token_expired(&access_token);
2071            } else {
2072                // If the request succeeded or it's an other error, just stop now.
2073                return result;
2074            }
2075        }
2076
2077        // Try without authentication.
2078        self.inner
2079            .http_client
2080            .send(
2081                get_supported_versions::Request::new(),
2082                request_config,
2083                homeserver.clone(),
2084                None,
2085                (),
2086                Default::default(),
2087            )
2088            .await
2089    }
2090
2091    /// Fetches client well_known from network; no caching.
2092    ///
2093    /// 1. If the [`Client::server`] value is available, we use it to fetch the
2094    ///    well-known contents.
2095    /// 2. If it's not, we try extracting the server name from the
2096    ///    [`Client::user_id`] and building the server URL from it.
2097    /// 3. If we couldn't get the well-known contents with either the explicit
2098    ///    server name or the implicit extracted one, we try the homeserver URL
2099    ///    as a last resort.
2100    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2101        let homeserver = self.homeserver();
2102        let scheme = homeserver.scheme();
2103
2104        // Use the server name, either an explicit one or an implicit one taken from
2105        // the user id: sometimes we'll have only the homeserver url available and no
2106        // server name, but the server name can be extracted from the current user id.
2107        let server_url = self
2108            .server()
2109            .map(|server| server.to_string())
2110            // If the server name wasn't available, extract it from the user id and build a URL:
2111            // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2112            // will be the same for the public server url, lacking a better candidate.
2113            .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2114
2115        // If the server name is available, first try using it
2116        let response = if let Some(server_url) = server_url {
2117            // First try using the server name
2118            self.fetch_client_well_known_with_url(server_url).await
2119        } else {
2120            None
2121        };
2122
2123        // If we didn't get a well-known value yet, try with the homeserver url instead:
2124        if response.is_none() {
2125            // Sometimes people configure their well-known directly on the homeserver so use
2126            // this as a fallback when the server name is unknown.
2127            warn!(
2128                "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2129            );
2130            self.fetch_client_well_known_with_url(homeserver.to_string()).await
2131        } else {
2132            response
2133        }
2134    }
2135
2136    async fn fetch_client_well_known_with_url(
2137        &self,
2138        url: String,
2139    ) -> Option<discover_homeserver::Response> {
2140        let well_known = self
2141            .inner
2142            .http_client
2143            .send(
2144                discover_homeserver::Request::new(),
2145                Some(RequestConfig::short_retry()),
2146                url,
2147                None,
2148                (),
2149                Default::default(),
2150            )
2151            .await;
2152
2153        match well_known {
2154            Ok(well_known) => Some(well_known),
2155            Err(http_error) => {
2156                // It is perfectly valid to not have a well-known file.
2157                // Maybe we should check for a specific error code to be sure?
2158                warn!("Failed to fetch client well-known: {http_error}");
2159                None
2160            }
2161        }
2162    }
2163
2164    /// Load supported versions from storage, or fetch them from network and
2165    /// cache them.
2166    ///
2167    /// If `failsafe` is true, this will try to minimize side effects to avoid
2168    /// possible deadlocks.
2169    async fn fetch_supported_versions(
2170        &self,
2171        failsafe: bool,
2172    ) -> HttpResult<SupportedVersionsResponse> {
2173        let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2174        let supported_versions = SupportedVersionsResponse {
2175            versions: server_versions.versions,
2176            unstable_features: server_versions.unstable_features,
2177        };
2178
2179        Ok(supported_versions)
2180    }
2181
2182    /// Get the Matrix versions and features supported by the homeserver by
2183    /// fetching them from the server or the cache.
2184    ///
2185    /// This is equivalent to calling both [`Client::server_versions()`] and
2186    /// [`Client::unstable_features()`]. To always fetch the result from the
2187    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2188    /// and then `.as_supported_versions()` on the response.
2189    ///
2190    /// # Examples
2191    ///
2192    /// ```no_run
2193    /// use ruma::api::{FeatureFlag, MatrixVersion};
2194    /// # use matrix_sdk::{Client, config::SyncSettings};
2195    /// # use url::Url;
2196    /// # async {
2197    /// # let homeserver = Url::parse("http://localhost:8080")?;
2198    /// # let mut client = Client::new(homeserver).await?;
2199    ///
2200    /// let supported = client.supported_versions().await?;
2201    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2202    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2203    ///
2204    /// let msc_x_feature = FeatureFlag::from("msc_x");
2205    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2206    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2207    /// # anyhow::Ok(()) };
2208    /// ```
2209    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2210        self.supported_versions_inner(false).await
2211    }
2212
2213    /// Get the Matrix versions and features supported by the homeserver by
2214    /// fetching them from the server or the cache.
2215    ///
2216    /// If `failsafe` is true, this will try to minimize side effects to avoid
2217    /// possible deadlocks.
2218    pub(crate) async fn supported_versions_inner(
2219        &self,
2220        failsafe: bool,
2221    ) -> HttpResult<SupportedVersions> {
2222        match self.supported_versions_cached_inner(failsafe).await {
2223            Ok(Some(value)) => {
2224                return Ok(value);
2225            }
2226            Ok(None) => {
2227                // The cache is empty, make a request.
2228            }
2229            Err(error) => {
2230                warn!("error when loading cached supported versions: {error}");
2231                // Fallthrough to make a request.
2232            }
2233        }
2234
2235        self.refresh_supported_versions_cache(failsafe).await
2236    }
2237
2238    /// Refresh the Matrix versions and features supported by the homeserver in
2239    /// the cache.
2240    ///
2241    /// If `failsafe` is true, this will try to minimize side effects to avoid
2242    /// possible deadlocks.
2243    async fn refresh_supported_versions_cache(
2244        &self,
2245        failsafe: bool,
2246    ) -> HttpResult<SupportedVersions> {
2247        let cached_supported_versions = &self.inner.caches.supported_versions;
2248
2249        let mut supported_versions_guard = match cached_supported_versions.refresh_lock.try_lock() {
2250            Ok(guard) => guard,
2251            Err(_) => {
2252                // There is already a refresh in progress, wait for it to finish.
2253                let guard = cached_supported_versions.refresh_lock.lock().await;
2254
2255                if let Err(error) = guard.as_ref() {
2256                    // There was an error in the previous refresh, return it.
2257                    return Err(HttpError::Cached(error.clone()));
2258                }
2259
2260                // Reuse the data if it was cached and it hasn't expired.
2261                if let CachedValue::Cached(value) = cached_supported_versions.value()
2262                    && !value.has_expired()
2263                {
2264                    return Ok(value.into_data());
2265                }
2266
2267                // The data wasn't cached or has expired, we need to make another request.
2268                guard
2269            }
2270        };
2271
2272        let response = match self.fetch_supported_versions(failsafe).await {
2273            Ok(response) => {
2274                *supported_versions_guard = Ok(());
2275                TtlValue::new(response)
2276            }
2277            Err(error) => {
2278                let error = Arc::new(error);
2279                *supported_versions_guard = Err(error.clone());
2280                return Err(HttpError::Cached(error));
2281            }
2282        };
2283
2284        let supported_versions = response.as_ref().map(|response| response.supported_versions());
2285
2286        // Only cache the result if the request was authenticated.
2287        if self.auth_ctx().has_valid_access_token() {
2288            if let Err(err) = self
2289                .state_store()
2290                .set_kv_data(
2291                    StateStoreDataKey::SupportedVersions,
2292                    StateStoreDataValue::SupportedVersions(response),
2293                )
2294                .await
2295            {
2296                warn!("error when caching supported versions: {err}");
2297            }
2298
2299            cached_supported_versions.set_value(supported_versions.clone());
2300        }
2301
2302        Ok(supported_versions.into_data())
2303    }
2304
2305    /// Get the Matrix versions and features supported by the homeserver by
2306    /// fetching them from the cache.
2307    ///
2308    /// For a version of this function that fetches the supported versions and
2309    /// features from the homeserver if the [`SupportedVersions`] aren't
2310    /// found in the cache, take a look at the [`Client::supported_versions()`]
2311    /// method.
2312    ///
2313    /// If the data in the cache has expired, this will trigger a background
2314    /// task to refresh it.
2315    ///
2316    /// # Examples
2317    ///
2318    /// ```no_run
2319    /// use ruma::api::{FeatureFlag, MatrixVersion};
2320    /// # use matrix_sdk::{Client, config::SyncSettings};
2321    /// # use url::Url;
2322    /// # async {
2323    /// # let homeserver = Url::parse("http://localhost:8080")?;
2324    /// # let mut client = Client::new(homeserver).await?;
2325    ///
2326    /// let supported =
2327    ///     if let Some(supported) = client.supported_versions_cached().await? {
2328    ///         supported
2329    ///     } else {
2330    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2331    ///     };
2332    ///
2333    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2334    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2335    ///
2336    /// let msc_x_feature = FeatureFlag::from("msc_x");
2337    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2338    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2339    /// # anyhow::Ok(()) };
2340    /// ```
2341    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2342        self.supported_versions_cached_inner(false).await
2343    }
2344
2345    async fn supported_versions_cached_inner(
2346        &self,
2347        failsafe: bool,
2348    ) -> Result<Option<SupportedVersions>, StoreError> {
2349        let supported_versions_cache = &self.inner.caches.supported_versions;
2350
2351        let value = if let CachedValue::Cached(cached) = supported_versions_cache.value() {
2352            cached
2353        } else if let Some(stored) = self
2354            .state_store()
2355            .get_kv_data(StateStoreDataKey::SupportedVersions)
2356            .await?
2357            .and_then(|value| value.into_supported_versions())
2358        {
2359            let stored = stored.map(|response| response.supported_versions());
2360
2361            // Copy the data from the store in the in-memory cache.
2362            supported_versions_cache.set_value(stored.clone());
2363
2364            stored
2365        } else {
2366            return Ok(None);
2367        };
2368
2369        // Spawn a task to refresh the cache if it has expired and we have a valid
2370        // access token.
2371        if value.has_expired() && self.auth_ctx().has_valid_access_token() {
2372            debug!("spawning task to refresh supported versions cache");
2373
2374            let client = self.clone();
2375            self.task_monitor().spawn_finite_task("refresh supported versions cache", async move {
2376                if let Err(error) = client.refresh_supported_versions_cache(failsafe).await {
2377                    warn!("failed to refresh supported versions cache: {error}");
2378                }
2379            });
2380        }
2381
2382        Ok(Some(value.into_data()))
2383    }
2384
2385    /// Get the Matrix versions supported by the homeserver by fetching them
2386    /// from the server or the cache.
2387    ///
2388    /// # Examples
2389    ///
2390    /// ```no_run
2391    /// use ruma::api::MatrixVersion;
2392    /// # use matrix_sdk::{Client, config::SyncSettings};
2393    /// # use url::Url;
2394    /// # async {
2395    /// # let homeserver = Url::parse("http://localhost:8080")?;
2396    /// # let mut client = Client::new(homeserver).await?;
2397    ///
2398    /// let server_versions = client.server_versions().await?;
2399    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2400    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2401    /// # anyhow::Ok(()) };
2402    /// ```
2403    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2404        Ok(self.supported_versions().await?.versions)
2405    }
2406
2407    /// Get the unstable features supported by the homeserver by fetching them
2408    /// from the server or the cache.
2409    ///
2410    /// # Examples
2411    ///
2412    /// ```no_run
2413    /// use matrix_sdk::ruma::api::FeatureFlag;
2414    /// # use matrix_sdk::{Client, config::SyncSettings};
2415    /// # use url::Url;
2416    /// # async {
2417    /// # let homeserver = Url::parse("http://localhost:8080")?;
2418    /// # let mut client = Client::new(homeserver).await?;
2419    ///
2420    /// let msc_x_feature = FeatureFlag::from("msc_x");
2421    /// let unstable_features = client.unstable_features().await?;
2422    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2423    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2424    /// # anyhow::Ok(()) };
2425    /// ```
2426    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2427        Ok(self.supported_versions().await?.features)
2428    }
2429
2430    /// Empty the supported versions and unstable features cache.
2431    ///
2432    /// Since the SDK caches the supported versions, it's possible to have a
2433    /// stale entry in the cache. This functions makes it possible to force
2434    /// reset it.
2435    pub async fn reset_supported_versions(&self) -> Result<()> {
2436        // Empty the in-memory cache.
2437        self.inner.caches.supported_versions.reset();
2438
2439        // Empty the store cache.
2440        Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2441    }
2442
2443    /// Get the well-known file of the homeserver from the cache.
2444    ///
2445    /// If the data in the cache has expired, this will trigger a background
2446    /// task to refresh it.
2447    async fn well_known_cached(
2448        &self,
2449    ) -> Result<CachedValue<Option<WellKnownResponse>>, StoreError> {
2450        let well_known_cache = &self.inner.caches.well_known;
2451
2452        let value = if let CachedValue::Cached(cached) = well_known_cache.value() {
2453            cached
2454        } else if let Some(stored) = self
2455            .state_store()
2456            .get_kv_data(StateStoreDataKey::WellKnown)
2457            .await?
2458            .and_then(|value| value.into_well_known())
2459        {
2460            // Copy the data from the store into the in-memory cache.
2461            well_known_cache.set_value(stored.clone());
2462
2463            stored
2464        } else {
2465            return Ok(CachedValue::NotSet);
2466        };
2467
2468        // Spawn a task to refresh the cache if it has expired.
2469        if value.has_expired() {
2470            debug!("spawning task to refresh well-known cache");
2471
2472            let client = self.clone();
2473            self.task_monitor().spawn_finite_task("refresh well-known cache", async move {
2474                client.refresh_well_known_cache().await;
2475            });
2476        }
2477
2478        Ok(CachedValue::Cached(value.into_data()))
2479    }
2480
2481    /// Refresh the well-known file of the homeserver in the cache.
2482    async fn refresh_well_known_cache(&self) -> Option<WellKnownResponse> {
2483        let well_known_cache = &self.inner.caches.well_known;
2484
2485        let _well_known_guard = match well_known_cache.refresh_lock.try_lock() {
2486            Ok(guard) => guard,
2487            Err(_) => {
2488                // There is already a refresh in progress, wait for it to finish.
2489                let guard = well_known_cache.refresh_lock.lock().await;
2490
2491                // A refresh can't fail because we ignore failures, so there shouldn't be an
2492                // error in the refresh lock.
2493
2494                // Reuse the data if it was cached and it hasn't expired.
2495                if let CachedValue::Cached(value) = well_known_cache.value()
2496                    && !value.has_expired()
2497                {
2498                    return value.into_data();
2499                }
2500
2501                // The data wasn't cached or has expired, we need to make another request.
2502                guard
2503            }
2504        };
2505
2506        let well_known = TtlValue::new(self.fetch_client_well_known().await.map(Into::into));
2507
2508        if let Err(err) = self
2509            .state_store()
2510            .set_kv_data(
2511                StateStoreDataKey::WellKnown,
2512                StateStoreDataValue::WellKnown(well_known.clone()),
2513            )
2514            .await
2515        {
2516            warn!("error when caching well-known: {err}");
2517        }
2518
2519        well_known_cache.set_value(well_known.clone());
2520
2521        well_known.into_data()
2522    }
2523
2524    /// Get the well-known file of the homeserver by fetching it from the server
2525    /// or the cache.
2526    async fn well_known(&self) -> Option<WellKnownResponse> {
2527        match self.well_known_cached().await {
2528            Ok(CachedValue::Cached(value)) => {
2529                return value;
2530            }
2531            Ok(CachedValue::NotSet) => {
2532                // The cache is empty, make a request.
2533            }
2534            Err(error) => {
2535                warn!("error when loading cached well-known: {error}");
2536                // Fallthrough to make a request.
2537            }
2538        }
2539
2540        self.refresh_well_known_cache().await
2541    }
2542
2543    /// Get information about the homeserver's advertised RTC foci by fetching
2544    /// the well-known file from the server or the cache.
2545    ///
2546    /// # Examples
2547    /// ```no_run
2548    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2549    /// # use url::Url;
2550    /// # async {
2551    /// # let homeserver = Url::parse("http://localhost:8080")?;
2552    /// # let mut client = Client::new(homeserver).await?;
2553    /// let rtc_foci = client.rtc_foci().await?;
2554    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2555    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2556    ///     _ => None,
2557    /// });
2558    /// if let Some(info) = default_livekit_focus_info {
2559    ///     println!("Default LiveKit service URL: {}", info.service_url);
2560    /// }
2561    /// # anyhow::Ok(()) };
2562    /// ```
2563    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2564        let well_known = self.well_known().await;
2565
2566        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2567    }
2568
2569    /// Empty the well-known cache.
2570    ///
2571    /// Since the SDK caches the well-known, it's possible to have a stale entry
2572    /// in the cache. This functions makes it possible to force reset it.
2573    pub async fn reset_well_known(&self) -> Result<()> {
2574        // Empty the in-memory caches.
2575        self.inner.caches.well_known.reset();
2576
2577        // Empty the store cache.
2578        Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2579    }
2580
2581    /// Check whether MSC 4028 is enabled on the homeserver.
2582    ///
2583    /// # Examples
2584    ///
2585    /// ```no_run
2586    /// # use matrix_sdk::{Client, config::SyncSettings};
2587    /// # use url::Url;
2588    /// # async {
2589    /// # let homeserver = Url::parse("http://localhost:8080")?;
2590    /// # let mut client = Client::new(homeserver).await?;
2591    /// let msc4028_enabled =
2592    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2593    /// # anyhow::Ok(()) };
2594    /// ```
2595    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2596        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2597    }
2598
2599    /// Get information of all our own devices.
2600    ///
2601    /// # Examples
2602    ///
2603    /// ```no_run
2604    /// # use matrix_sdk::{Client, config::SyncSettings};
2605    /// # use url::Url;
2606    /// # async {
2607    /// # let homeserver = Url::parse("http://localhost:8080")?;
2608    /// # let mut client = Client::new(homeserver).await?;
2609    /// let response = client.devices().await?;
2610    ///
2611    /// for device in response.devices {
2612    ///     println!(
2613    ///         "Device: {} {}",
2614    ///         device.device_id,
2615    ///         device.display_name.as_deref().unwrap_or("")
2616    ///     );
2617    /// }
2618    /// # anyhow::Ok(()) };
2619    /// ```
2620    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2621        let request = get_devices::v3::Request::new();
2622
2623        self.send(request).await
2624    }
2625
2626    /// Delete the given devices from the server.
2627    ///
2628    /// # Arguments
2629    ///
2630    /// * `devices` - The list of devices that should be deleted from the
2631    ///   server.
2632    ///
2633    /// * `auth_data` - This request requires user interactive auth, the first
2634    ///   request needs to set this to `None` and will always fail with an
2635    ///   `UiaaResponse`. The response will contain information for the
2636    ///   interactive auth and the same request needs to be made but this time
2637    ///   with some `auth_data` provided.
2638    ///
2639    /// ```no_run
2640    /// # use matrix_sdk::{
2641    /// #    ruma::{api::client::uiaa, owned_device_id},
2642    /// #    Client, Error, config::SyncSettings,
2643    /// # };
2644    /// # use serde_json::json;
2645    /// # use url::Url;
2646    /// # use std::collections::BTreeMap;
2647    /// # async {
2648    /// # let homeserver = Url::parse("http://localhost:8080")?;
2649    /// # let mut client = Client::new(homeserver).await?;
2650    /// let devices = &[owned_device_id!("DEVICEID")];
2651    ///
2652    /// if let Err(e) = client.delete_devices(devices, None).await {
2653    ///     if let Some(info) = e.as_uiaa_response() {
2654    ///         let mut password = uiaa::Password::new(
2655    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2656    ///             "wordpass".to_owned(),
2657    ///         );
2658    ///         password.session = info.session.clone();
2659    ///
2660    ///         client
2661    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2662    ///             .await?;
2663    ///     }
2664    /// }
2665    /// # anyhow::Ok(()) };
2666    pub async fn delete_devices(
2667        &self,
2668        devices: &[OwnedDeviceId],
2669        auth_data: Option<uiaa::AuthData>,
2670    ) -> HttpResult<delete_devices::v3::Response> {
2671        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2672        request.auth = auth_data;
2673
2674        self.send(request).await
2675    }
2676
2677    /// Change the display name of a device owned by the current user.
2678    ///
2679    /// Returns a `update_device::Response` which specifies the result
2680    /// of the operation.
2681    ///
2682    /// # Arguments
2683    ///
2684    /// * `device_id` - The ID of the device to change the display name of.
2685    /// * `display_name` - The new display name to set.
2686    pub async fn rename_device(
2687        &self,
2688        device_id: &DeviceId,
2689        display_name: &str,
2690    ) -> HttpResult<update_device::v3::Response> {
2691        let mut request = update_device::v3::Request::new(device_id.to_owned());
2692        request.display_name = Some(display_name.to_owned());
2693
2694        self.send(request).await
2695    }
2696
2697    /// Check whether a device with a specific ID exists on the server.
2698    ///
2699    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2700    /// with 404 and the underlying error otherwise.
2701    ///
2702    /// # Arguments
2703    ///
2704    /// * `device_id` - The ID of the device to query.
2705    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2706        let request = device::get_device::v3::Request::new(device_id);
2707        match self.send(request).await {
2708            Ok(_) => Ok(true),
2709            Err(err) => {
2710                if let Some(error) = err.as_client_api_error()
2711                    && error.status_code == 404
2712                {
2713                    Ok(false)
2714                } else {
2715                    Err(err.into())
2716                }
2717            }
2718        }
2719    }
2720
2721    /// Synchronize the client's state with the latest state on the server.
2722    ///
2723    /// ## Syncing Events
2724    ///
2725    /// Messages or any other type of event need to be periodically fetched from
2726    /// the server, this is achieved by sending a `/sync` request to the server.
2727    ///
2728    /// The first sync is sent out without a [`token`]. The response of the
2729    /// first sync will contain a [`next_batch`] field which should then be
2730    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2731    /// don't receive the same events multiple times.
2732    ///
2733    /// ## Long Polling
2734    ///
2735    /// A sync should in the usual case always be in flight. The
2736    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2737    /// long the server will wait for new events before it will respond.
2738    /// The server will respond immediately if some new events arrive before the
2739    /// timeout has expired. If no changes arrive and the timeout expires an
2740    /// empty sync response will be sent to the client.
2741    ///
2742    /// This method of sending a request that may not receive a response
2743    /// immediately is called long polling.
2744    ///
2745    /// ## Filtering Events
2746    ///
2747    /// The number or type of messages and events that the client should receive
2748    /// from the server can be altered using a [`Filter`].
2749    ///
2750    /// Filters can be non-trivial and, since they will be sent with every sync
2751    /// request, they may take up a bunch of unnecessary bandwidth.
2752    ///
2753    /// Luckily filters can be uploaded to the server and reused using an unique
2754    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2755    /// method.
2756    ///
2757    /// # Arguments
2758    ///
2759    /// * `sync_settings` - Settings for the sync call, this allows us to set
2760    /// various options to configure the sync:
2761    ///     * [`filter`] - To configure which events we receive and which get
2762    ///       [filtered] by the server
2763    ///     * [`timeout`] - To configure our [long polling] setup.
2764    ///     * [`token`] - To tell the server which events we already received
2765    ///       and where we wish to continue syncing.
2766    ///     * [`full_state`] - To tell the server that we wish to receive all
2767    ///       state events, regardless of our configured [`token`].
2768    ///     * [`set_presence`] - To tell the server to set the presence and to
2769    ///       which state.
2770    ///
2771    /// # Examples
2772    ///
2773    /// ```no_run
2774    /// # use url::Url;
2775    /// # async {
2776    /// # let homeserver = Url::parse("http://localhost:8080")?;
2777    /// # let username = "";
2778    /// # let password = "";
2779    /// use matrix_sdk::{
2780    ///     Client, config::SyncSettings,
2781    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2782    /// };
2783    ///
2784    /// let client = Client::new(homeserver).await?;
2785    /// client.matrix_auth().login_username(username, password).send().await?;
2786    ///
2787    /// // Sync once so we receive the client state and old messages.
2788    /// client.sync_once(SyncSettings::default()).await?;
2789    ///
2790    /// // Register our handler so we start responding once we receive a new
2791    /// // event.
2792    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2793    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2794    /// });
2795    ///
2796    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2797    /// // from our `sync_once()` call automatically.
2798    /// client.sync(SyncSettings::default()).await;
2799    /// # anyhow::Ok(()) };
2800    /// ```
2801    ///
2802    /// [`sync`]: #method.sync
2803    /// [`SyncSettings`]: crate::config::SyncSettings
2804    /// [`token`]: crate::config::SyncSettings#method.token
2805    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2806    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2807    /// [`set_presence`]: ruma::presence::PresenceState
2808    /// [`filter`]: crate::config::SyncSettings#method.filter
2809    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2810    /// [`next_batch`]: SyncResponse#structfield.next_batch
2811    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2812    /// [long polling]: #long-polling
2813    /// [filtered]: #filtering-events
2814    #[instrument(skip(self))]
2815    pub async fn sync_once(
2816        &self,
2817        sync_settings: crate::config::SyncSettings,
2818    ) -> Result<SyncResponse> {
2819        // The sync might not return for quite a while due to the timeout.
2820        // We'll see if there's anything crypto related to send out before we
2821        // sync, i.e. if we closed our client after a sync but before the
2822        // crypto requests were sent out.
2823        //
2824        // This will mostly be a no-op.
2825        #[cfg(feature = "e2e-encryption")]
2826        if let Err(e) = self.send_outgoing_requests().await {
2827            error!(error = ?e, "Error while sending outgoing E2EE requests");
2828        }
2829
2830        let token = match sync_settings.token {
2831            SyncToken::Specific(token) => Some(token),
2832            SyncToken::NoToken => None,
2833            SyncToken::ReusePrevious => self.sync_token().await,
2834        };
2835
2836        let request = assign!(sync_events::v3::Request::new(), {
2837            filter: sync_settings.filter.map(|f| *f),
2838            since: token,
2839            full_state: sync_settings.full_state,
2840            set_presence: sync_settings.set_presence,
2841            timeout: sync_settings.timeout,
2842            use_state_after: true,
2843        });
2844        let mut request_config = self.request_config();
2845        if let Some(timeout) = sync_settings.timeout {
2846            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2847            request_config.timeout = Some(base_timeout + timeout);
2848        }
2849
2850        let response = self.send(request).with_request_config(request_config).await?;
2851        let next_batch = response.next_batch.clone();
2852        let response = self.process_sync(response).await?;
2853
2854        #[cfg(feature = "e2e-encryption")]
2855        if let Err(e) = self.send_outgoing_requests().await {
2856            error!(error = ?e, "Error while sending outgoing E2EE requests");
2857        }
2858
2859        self.inner.sync_beat.notify(usize::MAX);
2860
2861        Ok(SyncResponse::new(next_batch, response))
2862    }
2863
2864    /// Repeatedly synchronize the client state with the server.
2865    ///
2866    /// This method will only return on error, if cancellation is needed
2867    /// the method should be wrapped in a cancelable task or the
2868    /// [`Client::sync_with_callback`] method can be used or
2869    /// [`Client::sync_with_result_callback`] if you want to handle error
2870    /// cases in the loop, too.
2871    ///
2872    /// This method will internally call [`Client::sync_once`] in a loop.
2873    ///
2874    /// This method can be used with the [`Client::add_event_handler`]
2875    /// method to react to individual events. If you instead wish to handle
2876    /// events in a bulk manner the [`Client::sync_with_callback`],
2877    /// [`Client::sync_with_result_callback`] and
2878    /// [`Client::sync_stream`] methods can be used instead. Those methods
2879    /// repeatedly return the whole sync response.
2880    ///
2881    /// # Arguments
2882    ///
2883    /// * `sync_settings` - Settings for the sync call. *Note* that those
2884    ///   settings will be only used for the first sync call. See the argument
2885    ///   docs for [`Client::sync_once`] for more info.
2886    ///
2887    /// # Return
2888    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2889    /// up to the user of the API to check the error and decide whether the sync
2890    /// should continue or not.
2891    ///
2892    /// # Examples
2893    ///
2894    /// ```no_run
2895    /// # use url::Url;
2896    /// # async {
2897    /// # let homeserver = Url::parse("http://localhost:8080")?;
2898    /// # let username = "";
2899    /// # let password = "";
2900    /// use matrix_sdk::{
2901    ///     Client, config::SyncSettings,
2902    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2903    /// };
2904    ///
2905    /// let client = Client::new(homeserver).await?;
2906    /// client.matrix_auth().login_username(&username, &password).send().await?;
2907    ///
2908    /// // Register our handler so we start responding once we receive a new
2909    /// // event.
2910    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2911    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2912    /// });
2913    ///
2914    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2915    /// // automatically.
2916    /// client.sync(SyncSettings::default()).await?;
2917    /// # anyhow::Ok(()) };
2918    /// ```
2919    ///
2920    /// [argument docs]: #method.sync_once
2921    /// [`sync_with_callback`]: #method.sync_with_callback
2922    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2923        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2924    }
2925
2926    /// Repeatedly call sync to synchronize the client state with the server.
2927    ///
2928    /// # Arguments
2929    ///
2930    /// * `sync_settings` - Settings for the sync call. *Note* that those
2931    ///   settings will be only used for the first sync call. See the argument
2932    ///   docs for [`Client::sync_once`] for more info.
2933    ///
2934    /// * `callback` - A callback that will be called every time a successful
2935    ///   response has been fetched from the server. The callback must return a
2936    ///   boolean which signalizes if the method should stop syncing. If the
2937    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2938    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2939    ///
2940    /// # Return
2941    /// The sync runs until an error occurs or the
2942    /// callback indicates that the Loop should stop. If the callback asked for
2943    /// a regular stop, the result will be `Ok(())` otherwise the
2944    /// `Err(Error)` is returned.
2945    ///
2946    /// # Examples
2947    ///
2948    /// The following example demonstrates how to sync forever while sending all
2949    /// the interesting events through a mpsc channel to another thread e.g. a
2950    /// UI thread.
2951    ///
2952    /// ```no_run
2953    /// # use std::time::Duration;
2954    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2955    /// # use url::Url;
2956    /// # async {
2957    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2958    /// # let mut client = Client::new(homeserver).await.unwrap();
2959    ///
2960    /// use tokio::sync::mpsc::channel;
2961    ///
2962    /// let (tx, rx) = channel(100);
2963    ///
2964    /// let sync_channel = &tx;
2965    /// let sync_settings = SyncSettings::new()
2966    ///     .timeout(Duration::from_secs(30));
2967    ///
2968    /// client
2969    ///     .sync_with_callback(sync_settings, |response| async move {
2970    ///         let channel = sync_channel;
2971    ///         for (room_id, room) in response.rooms.joined {
2972    ///             for event in room.timeline.events {
2973    ///                 channel.send(event).await.unwrap();
2974    ///             }
2975    ///         }
2976    ///
2977    ///         LoopCtrl::Continue
2978    ///     })
2979    ///     .await;
2980    /// };
2981    /// ```
2982    #[instrument(skip_all)]
2983    pub async fn sync_with_callback<C>(
2984        &self,
2985        sync_settings: crate::config::SyncSettings,
2986        callback: impl Fn(SyncResponse) -> C,
2987    ) -> Result<(), Error>
2988    where
2989        C: Future<Output = LoopCtrl>,
2990    {
2991        self.sync_with_result_callback(sync_settings, |result| async {
2992            Ok(callback(result?).await)
2993        })
2994        .await
2995    }
2996
2997    /// Repeatedly call sync to synchronize the client state with the server.
2998    ///
2999    /// # Arguments
3000    ///
3001    /// * `sync_settings` - Settings for the sync call. *Note* that those
3002    ///   settings will be only used for the first sync call. See the argument
3003    ///   docs for [`Client::sync_once`] for more info.
3004    ///
3005    /// * `callback` - A callback that will be called every time after a
3006    ///   response has been received, failure or not. The callback returns a
3007    ///   `Result<LoopCtrl, Error>`, too. When returning
3008    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
3009    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
3010    ///   function returns `Ok(())`. In case the callback can't handle the
3011    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
3012    ///   which results in the sync ending and the `Err(Error)` being returned.
3013    ///
3014    /// # Return
3015    /// The sync runs until an error occurs that the callback can't handle or
3016    /// the callback indicates that the Loop should stop. If the callback
3017    /// asked for a regular stop, the result will be `Ok(())` otherwise the
3018    /// `Err(Error)` is returned.
3019    ///
3020    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
3021    /// this, and are handled first without sending the result to the
3022    /// callback. Only after they have exceeded is the `Result` handed to
3023    /// the callback.
3024    ///
3025    /// # Examples
3026    ///
3027    /// The following example demonstrates how to sync forever while sending all
3028    /// the interesting events through a mpsc channel to another thread e.g. a
3029    /// UI thread.
3030    ///
3031    /// ```no_run
3032    /// # use std::time::Duration;
3033    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
3034    /// # use url::Url;
3035    /// # async {
3036    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
3037    /// # let mut client = Client::new(homeserver).await.unwrap();
3038    /// #
3039    /// use tokio::sync::mpsc::channel;
3040    ///
3041    /// let (tx, rx) = channel(100);
3042    ///
3043    /// let sync_channel = &tx;
3044    /// let sync_settings = SyncSettings::new()
3045    ///     .timeout(Duration::from_secs(30));
3046    ///
3047    /// client
3048    ///     .sync_with_result_callback(sync_settings, |response| async move {
3049    ///         let channel = sync_channel;
3050    ///         let sync_response = response?;
3051    ///         for (room_id, room) in sync_response.rooms.joined {
3052    ///              for event in room.timeline.events {
3053    ///                  channel.send(event).await.unwrap();
3054    ///               }
3055    ///         }
3056    ///
3057    ///         Ok(LoopCtrl::Continue)
3058    ///     })
3059    ///     .await;
3060    /// };
3061    /// ```
3062    #[instrument(skip(self, callback))]
3063    pub async fn sync_with_result_callback<C>(
3064        &self,
3065        sync_settings: crate::config::SyncSettings,
3066        callback: impl Fn(Result<SyncResponse, Error>) -> C,
3067    ) -> Result<(), Error>
3068    where
3069        C: Future<Output = Result<LoopCtrl, Error>>,
3070    {
3071        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
3072
3073        while let Some(result) = sync_stream.next().await {
3074            trace!("Running callback");
3075            if callback(result).await? == LoopCtrl::Break {
3076                trace!("Callback told us to stop");
3077                break;
3078            }
3079            trace!("Done running callback");
3080        }
3081
3082        Ok(())
3083    }
3084
3085    //// Repeatedly synchronize the client state with the server.
3086    ///
3087    /// This method will internally call [`Client::sync_once`] in a loop and is
3088    /// equivalent to the [`Client::sync`] method but the responses are provided
3089    /// as an async stream.
3090    ///
3091    /// # Arguments
3092    ///
3093    /// * `sync_settings` - Settings for the sync call. *Note* that those
3094    ///   settings will be only used for the first sync call. See the argument
3095    ///   docs for [`Client::sync_once`] for more info.
3096    ///
3097    /// # Examples
3098    ///
3099    /// ```no_run
3100    /// # use url::Url;
3101    /// # async {
3102    /// # let homeserver = Url::parse("http://localhost:8080")?;
3103    /// # let username = "";
3104    /// # let password = "";
3105    /// use futures_util::StreamExt;
3106    /// use matrix_sdk::{Client, config::SyncSettings};
3107    ///
3108    /// let client = Client::new(homeserver).await?;
3109    /// client.matrix_auth().login_username(&username, &password).send().await?;
3110    ///
3111    /// let mut sync_stream =
3112    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
3113    ///
3114    /// while let Some(Ok(response)) = sync_stream.next().await {
3115    ///     for room in response.rooms.joined.values() {
3116    ///         for e in &room.timeline.events {
3117    ///             if let Ok(event) = e.raw().deserialize() {
3118    ///                 println!("Received event {:?}", event);
3119    ///             }
3120    ///         }
3121    ///     }
3122    /// }
3123    ///
3124    /// # anyhow::Ok(()) };
3125    /// ```
3126    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3127    #[instrument(skip(self))]
3128    pub async fn sync_stream(
3129        &self,
3130        mut sync_settings: crate::config::SyncSettings,
3131    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3132        let mut is_first_sync = true;
3133        let mut timeout = None;
3134        let mut last_sync_time: Option<Instant> = None;
3135
3136        let parent_span = Span::current();
3137
3138        async_stream::stream!({
3139            loop {
3140                trace!("Syncing");
3141
3142                if sync_settings.ignore_timeout_on_first_sync {
3143                    if is_first_sync {
3144                        timeout = sync_settings.timeout.take();
3145                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
3146                        sync_settings.timeout = timeout.take();
3147                    }
3148
3149                    is_first_sync = false;
3150                }
3151
3152                yield self
3153                    .sync_loop_helper(&mut sync_settings)
3154                    .instrument(parent_span.clone())
3155                    .await;
3156
3157                Client::delay_sync(&mut last_sync_time).await
3158            }
3159        })
3160    }
3161
3162    /// Get the current, if any, sync token of the client.
3163    /// This will be None if the client didn't sync at least once.
3164    pub(crate) async fn sync_token(&self) -> Option<String> {
3165        self.inner.base_client.sync_token().await
3166    }
3167
3168    /// Gets information about the owner of a given access token.
3169    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3170        let request = whoami::v3::Request::new();
3171        self.send(request).await
3172    }
3173
3174    /// Subscribes a new receiver to client SessionChange broadcasts.
3175    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3176        let broadcast = &self.auth_ctx().session_change_sender;
3177        broadcast.subscribe()
3178    }
3179
3180    /// Sets the save/restore session callbacks.
3181    ///
3182    /// This is another mechanism to get synchronous updates to session tokens,
3183    /// while [`Self::subscribe_to_session_changes`] provides an async update.
3184    pub fn set_session_callbacks(
3185        &self,
3186        reload_session_callback: Box<ReloadSessionCallback>,
3187        save_session_callback: Box<SaveSessionCallback>,
3188    ) -> Result<()> {
3189        self.inner
3190            .auth_ctx
3191            .reload_session_callback
3192            .set(reload_session_callback)
3193            .map_err(|_| Error::MultipleSessionCallbacks)?;
3194
3195        self.inner
3196            .auth_ctx
3197            .save_session_callback
3198            .set(save_session_callback)
3199            .map_err(|_| Error::MultipleSessionCallbacks)?;
3200
3201        Ok(())
3202    }
3203
3204    /// Get the notification settings of the current owner of the client.
3205    pub async fn notification_settings(&self) -> NotificationSettings {
3206        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3207        NotificationSettings::new(self.clone(), ruleset)
3208    }
3209
3210    /// Create a new specialized `Client` that can process notifications.
3211    ///
3212    /// See [`CrossProcessLock::new`] to learn more about
3213    /// `cross_process_lock_config`.
3214    ///
3215    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3216    pub async fn notification_client(
3217        &self,
3218        cross_process_lock_config: CrossProcessLockConfig,
3219    ) -> Result<Client> {
3220        let client = Client {
3221            inner: ClientInner::new(
3222                self.inner.auth_ctx.clone(),
3223                self.server().cloned(),
3224                self.homeserver(),
3225                self.sliding_sync_version(),
3226                self.inner.http_client.clone(),
3227                self.inner
3228                    .base_client
3229                    .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3230                    .await?,
3231                self.inner.caches.supported_versions.value(),
3232                self.inner.caches.well_known.value(),
3233                self.inner.respect_login_well_known,
3234                self.inner.event_cache.clone(),
3235                self.inner.send_queue_data.clone(),
3236                self.inner.latest_events.clone(),
3237                #[cfg(feature = "e2e-encryption")]
3238                self.inner.e2ee.encryption_settings,
3239                #[cfg(feature = "e2e-encryption")]
3240                self.inner.enable_share_history_on_invite,
3241                cross_process_lock_config,
3242                #[cfg(feature = "experimental-search")]
3243                self.inner.search_index.clone(),
3244                self.inner.thread_subscription_catchup.clone(),
3245            )
3246            .await,
3247        };
3248
3249        Ok(client)
3250    }
3251
3252    /// The [`EventCache`] instance for this [`Client`].
3253    pub fn event_cache(&self) -> &EventCache {
3254        // SAFETY: always initialized in the `Client` ctor.
3255        self.inner.event_cache.get().unwrap()
3256    }
3257
3258    /// The [`LatestEvents`] instance for this [`Client`].
3259    pub async fn latest_events(&self) -> &LatestEvents {
3260        self.inner
3261            .latest_events
3262            .get_or_init(|| async {
3263                LatestEvents::new(
3264                    WeakClient::from_client(self),
3265                    self.event_cache().clone(),
3266                    SendQueue::new(self.clone()),
3267                    self.room_info_notable_update_receiver(),
3268                )
3269            })
3270            .await
3271    }
3272
3273    /// Waits until an at least partially synced room is received, and returns
3274    /// it.
3275    ///
3276    /// **Note: this function will loop endlessly until either it finds the room
3277    /// or an externally set timeout happens.**
3278    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3279        loop {
3280            if let Some(room) = self.get_room(room_id) {
3281                if room.is_state_partially_or_fully_synced() {
3282                    debug!("Found just created room!");
3283                    return room;
3284                }
3285                debug!("Room wasn't partially synced, waiting for sync beat to try again");
3286            } else {
3287                debug!("Room wasn't found, waiting for sync beat to try again");
3288            }
3289            self.inner.sync_beat.listen().await;
3290        }
3291    }
3292
3293    /// Knock on a room given its `room_id_or_alias` to ask for permission to
3294    /// join it.
3295    pub async fn knock(
3296        &self,
3297        room_id_or_alias: OwnedRoomOrAliasId,
3298        reason: Option<String>,
3299        server_names: Vec<OwnedServerName>,
3300    ) -> Result<Room> {
3301        let request =
3302            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3303        let response = self.send(request).await?;
3304        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3305        Ok(Room::new(self.clone(), base_room))
3306    }
3307
3308    /// Checks whether the provided `user_id` belongs to an ignored user.
3309    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3310        self.base_client().is_user_ignored(user_id).await
3311    }
3312
3313    /// Gets the `max_upload_size` value from the homeserver, getting either a
3314    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3315    /// missing.
3316    ///
3317    /// Check the spec for more info:
3318    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3319    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3320        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3321        if let Some(data) = max_upload_size_lock.get() {
3322            return Ok(data.to_owned());
3323        }
3324
3325        // Use the authenticated endpoint when the server supports it.
3326        let supported_versions = self.supported_versions().await?;
3327        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3328            .is_supported(&supported_versions);
3329
3330        let upload_size = if use_auth {
3331            self.send(authenticated_media::get_media_config::v1::Request::default())
3332                .await?
3333                .upload_size
3334        } else {
3335            #[allow(deprecated)]
3336            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3337        };
3338
3339        match max_upload_size_lock.set(upload_size) {
3340            Ok(_) => Ok(upload_size),
3341            Err(error) => {
3342                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3343            }
3344        }
3345    }
3346
3347    /// The settings to use for decrypting events.
3348    #[cfg(feature = "e2e-encryption")]
3349    pub fn decryption_settings(&self) -> &DecryptionSettings {
3350        &self.base_client().decryption_settings
3351    }
3352
3353    /// Returns the [`SearchIndex`] for this [`Client`].
3354    #[cfg(feature = "experimental-search")]
3355    pub fn search_index(&self) -> &SearchIndex {
3356        &self.inner.search_index
3357    }
3358
3359    /// Whether the client is configured to take thread subscriptions (MSC4306
3360    /// and MSC4308) into account, and the server enabled the experimental
3361    /// feature flag for it.
3362    ///
3363    /// This may cause filtering out of thread subscriptions, and loading the
3364    /// thread subscriptions via the sliding sync extension, when the room
3365    /// list service is being used.
3366    ///
3367    /// This is async and fallible as it may use the network to retrieve the
3368    /// server supported features, if they aren't cached already.
3369    pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3370        // Check if the client is configured to support thread subscriptions first.
3371        match self.base_client().threading_support {
3372            ThreadingSupport::Enabled { with_subscriptions: false }
3373            | ThreadingSupport::Disabled => return Ok(false),
3374            ThreadingSupport::Enabled { with_subscriptions: true } => {}
3375        }
3376
3377        // Now, let's check that the server supports it.
3378        let server_enabled = self
3379            .supported_versions()
3380            .await?
3381            .features
3382            .contains(&FeatureFlag::from("org.matrix.msc4306"));
3383
3384        Ok(server_enabled)
3385    }
3386
3387    /// Fetch thread subscriptions changes between `from` and up to `to`.
3388    ///
3389    /// The `limit` optional parameter can be used to limit the number of
3390    /// entries in a response. It can also be overridden by the server, if
3391    /// it's deemed too large.
3392    pub async fn fetch_thread_subscriptions(
3393        &self,
3394        from: Option<String>,
3395        to: Option<String>,
3396        limit: Option<UInt>,
3397    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3398        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3399            from,
3400            to,
3401            limit,
3402        });
3403        Ok(self.send(request).await?)
3404    }
3405
3406    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3407        self.inner.thread_subscription_catchup.get().unwrap()
3408    }
3409
3410    /// Perform database optimizations if any are available, i.e. vacuuming in
3411    /// SQLite.
3412    ///
3413    /// **Warning:** this was added to check if SQLite fragmentation was the
3414    /// source of performance issues, **DO NOT use in production**.
3415    #[doc(hidden)]
3416    pub async fn optimize_stores(&self) -> Result<()> {
3417        trace!("Optimizing state store...");
3418        self.state_store().optimize().await?;
3419
3420        trace!("Optimizing event cache store...");
3421        if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3422            clean_lock.optimize().await?;
3423        }
3424
3425        trace!("Optimizing media store...");
3426        self.media_store().lock().await?.optimize().await?;
3427
3428        Ok(())
3429    }
3430
3431    /// Returns the sizes of the existing stores, if known.
3432    pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3433        #[cfg(feature = "e2e-encryption")]
3434        let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3435            && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3436        {
3437            Some(store_size)
3438        } else {
3439            None
3440        };
3441        #[cfg(not(feature = "e2e-encryption"))]
3442        let crypto_store_size = None;
3443
3444        let state_store_size = self.state_store().get_size().await.ok().flatten();
3445
3446        let event_cache_store_size = if let Some(clean_lock) =
3447            self.event_cache_store().lock().await?.as_clean()
3448            && let Ok(Some(store_size)) = clean_lock.get_size().await
3449        {
3450            Some(store_size)
3451        } else {
3452            None
3453        };
3454
3455        let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3456
3457        Ok(StoreSizes {
3458            crypto_store: crypto_store_size,
3459            state_store: state_store_size,
3460            event_cache_store: event_cache_store_size,
3461            media_store: media_store_size,
3462        })
3463    }
3464
3465    /// Get a reference to the client's task monitor, for spawning background
3466    /// tasks.
3467    pub fn task_monitor(&self) -> &TaskMonitor {
3468        &self.inner.task_monitor
3469    }
3470
3471    /// Add a subscriber for duplicate key upload error notifications triggered
3472    /// by requests to /keys/upload.
3473    #[cfg(feature = "e2e-encryption")]
3474    pub fn subscribe_to_duplicate_key_upload_errors(
3475        &self,
3476    ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3477        self.inner.duplicate_key_upload_error_sender.subscribe()
3478    }
3479
3480    /// Check the record of whether we are waiting for an [MSC4268] key bundle
3481    /// for the given room.
3482    ///
3483    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3484    #[cfg(feature = "e2e-encryption")]
3485    pub async fn get_pending_key_bundle_details_for_room(
3486        &self,
3487        room_id: &RoomId,
3488    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3489        Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3490    }
3491
3492    /// Returns the [`DmRoomDefinition`] this client uses to check if a room is
3493    /// a DM.
3494    pub fn dm_room_definition(&self) -> &DmRoomDefinition {
3495        &self.inner.base_client.dm_room_definition
3496    }
3497}
3498
3499/// Contains the disk size of the different stores, if known. It won't be
3500/// available for in-memory stores.
3501#[derive(Debug, Clone)]
3502pub struct StoreSizes {
3503    /// The size of the CryptoStore.
3504    pub crypto_store: Option<usize>,
3505    /// The size of the StateStore.
3506    pub state_store: Option<usize>,
3507    /// The size of the EventCacheStore.
3508    pub event_cache_store: Option<usize>,
3509    /// The size of the MediaStore.
3510    pub media_store: Option<usize>,
3511}
3512
3513#[cfg(any(feature = "testing", test))]
3514impl Client {
3515    /// Test helper to mark users as tracked by the crypto layer.
3516    #[cfg(feature = "e2e-encryption")]
3517    pub async fn update_tracked_users_for_testing(
3518        &self,
3519        user_ids: impl IntoIterator<Item = &UserId>,
3520    ) {
3521        let olm = self.olm_machine().await;
3522        let olm = olm.as_ref().unwrap();
3523        olm.update_tracked_users(user_ids).await.unwrap();
3524    }
3525}
3526
3527/// A weak reference to the inner client, useful when trying to get a handle
3528/// on the owning client.
3529#[derive(Clone, Debug)]
3530pub(crate) struct WeakClient {
3531    client: Weak<ClientInner>,
3532}
3533
3534impl WeakClient {
3535    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3536    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3537        Self { client: Arc::downgrade(client) }
3538    }
3539
3540    /// Construct a [`WeakClient`] from a [`Client`].
3541    pub fn from_client(client: &Client) -> Self {
3542        Self::from_inner(&client.inner)
3543    }
3544
3545    /// Attempts to get a [`Client`] from this [`WeakClient`].
3546    pub fn get(&self) -> Option<Client> {
3547        self.client.upgrade().map(|inner| Client { inner })
3548    }
3549
3550    /// Gets the number of strong (`Arc`) pointers still pointing to this
3551    /// client.
3552    #[allow(dead_code)]
3553    pub fn strong_count(&self) -> usize {
3554        self.client.strong_count()
3555    }
3556}
3557
3558/// Information about the state of a room before we joined it.
3559#[derive(Debug, Clone, Default)]
3560struct PreJoinRoomInfo {
3561    /// The user who invited us to the room, if any.
3562    pub inviter: Option<RoomMember>,
3563}
3564
3565// The http mocking library is not supported for wasm32
3566#[cfg(all(test, not(target_family = "wasm")))]
3567pub(crate) mod tests {
3568    use std::{sync::Arc, time::Duration};
3569
3570    use assert_matches::assert_matches;
3571    use assert_matches2::assert_let;
3572    use eyeball::SharedObservable;
3573    use futures_util::{FutureExt, StreamExt, pin_mut};
3574    use js_int::{UInt, uint};
3575    use matrix_sdk_base::{
3576        RoomState,
3577        store::{MemoryStore, StoreConfig},
3578        ttl::TtlValue,
3579    };
3580    use matrix_sdk_test::{
3581        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3582        event_factory::EventFactory,
3583    };
3584    #[cfg(target_family = "wasm")]
3585    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3586
3587    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3588    use ruma::{
3589        RoomId, ServerName, UserId,
3590        api::{
3591            FeatureFlag, MatrixVersion,
3592            client::{
3593                discovery::discover_homeserver::RtcFocusInfo,
3594                room::create_room::v3::Request as CreateRoomRequest,
3595            },
3596        },
3597        assign,
3598        events::{
3599            ignored_user_list::IgnoredUserListEventContent,
3600            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3601        },
3602        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3603    };
3604    use serde_json::json;
3605    use stream_assert::{assert_next_matches, assert_pending};
3606    use tokio::{
3607        spawn,
3608        time::{sleep, timeout},
3609    };
3610    use url::Url;
3611
3612    use super::Client;
3613    use crate::{
3614        Error, TransmissionProgress,
3615        client::{WeakClient, caches::CachedValue, futures::SendMediaUploadRequest},
3616        config::{RequestConfig, SyncSettings},
3617        futures::SendRequest,
3618        media::MediaError,
3619        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3620    };
3621
3622    #[async_test]
3623    async fn test_account_data() {
3624        let server = MatrixMockServer::new().await;
3625        let client = server.client_builder().build().await;
3626
3627        let f = EventFactory::new();
3628        server
3629            .mock_sync()
3630            .ok_and_run(&client, |builder| {
3631                builder.add_global_account_data(
3632                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3633                );
3634            })
3635            .await;
3636
3637        let content = client
3638            .account()
3639            .account_data::<IgnoredUserListEventContent>()
3640            .await
3641            .unwrap()
3642            .unwrap()
3643            .deserialize()
3644            .unwrap();
3645
3646        assert_eq!(content.ignored_users.len(), 1);
3647    }
3648
3649    #[async_test]
3650    async fn test_successful_discovery() {
3651        // Imagine this is `matrix.org`.
3652        let server = MatrixMockServer::new().await;
3653        let server_url = server.uri();
3654
3655        // Imagine this is `matrix-client.matrix.org`.
3656        let homeserver = MatrixMockServer::new().await;
3657        let homeserver_url = homeserver.uri();
3658
3659        // Imagine Alice has the user ID `@alice:matrix.org`.
3660        let domain = server_url.strip_prefix("http://").unwrap();
3661        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3662
3663        // The `.well-known` is on the server (e.g. `matrix.org`).
3664        server
3665            .mock_well_known()
3666            .ok_with_homeserver_url(&homeserver_url)
3667            .mock_once()
3668            .named("well-known")
3669            .mount()
3670            .await;
3671
3672        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3673        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3674
3675        let client = Client::builder()
3676            .insecure_server_name_no_tls(alice.server_name())
3677            .build()
3678            .await
3679            .unwrap();
3680
3681        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3682        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3683        client.server_versions().await.unwrap();
3684    }
3685
3686    #[async_test]
3687    async fn test_discovery_broken_server() {
3688        let server = MatrixMockServer::new().await;
3689        let server_url = server.uri();
3690        let domain = server_url.strip_prefix("http://").unwrap();
3691        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3692
3693        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3694
3695        assert!(
3696            Client::builder()
3697                .insecure_server_name_no_tls(alice.server_name())
3698                .build()
3699                .await
3700                .is_err(),
3701            "Creating a client from a user ID should fail when the .well-known request fails."
3702        );
3703    }
3704
3705    #[async_test]
3706    async fn test_room_creation() {
3707        let server = MatrixMockServer::new().await;
3708        let client = server.client_builder().build().await;
3709
3710        let f = EventFactory::new().sender(user_id!("@example:localhost"));
3711        server
3712            .mock_sync()
3713            .ok_and_run(&client, |builder| {
3714                builder.add_joined_room(
3715                    JoinedRoomBuilder::default()
3716                        .add_state_event(
3717                            f.member(user_id!("@example:localhost")).display_name("example"),
3718                        )
3719                        .add_state_event(f.default_power_levels()),
3720                );
3721            })
3722            .await;
3723
3724        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3725        assert_eq!(room.state(), RoomState::Joined);
3726    }
3727
3728    #[async_test]
3729    async fn test_retry_limit_http_requests() {
3730        let server = MatrixMockServer::new().await;
3731        let client = server
3732            .client_builder()
3733            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3734            .build()
3735            .await;
3736
3737        assert!(client.request_config().retry_limit.unwrap() == 4);
3738
3739        server.mock_who_am_i().error500().expect(4).mount().await;
3740
3741        client.whoami().await.unwrap_err();
3742    }
3743
3744    #[async_test]
3745    async fn test_retry_timeout_http_requests() {
3746        // Keep this timeout small so that the test doesn't take long
3747        let retry_timeout = Duration::from_secs(5);
3748        let server = MatrixMockServer::new().await;
3749        let client = server
3750            .client_builder()
3751            .on_builder(|builder| {
3752                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3753            })
3754            .build()
3755            .await;
3756
3757        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3758
3759        server.mock_login().error500().expect(2..).mount().await;
3760
3761        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3762    }
3763
3764    #[async_test]
3765    async fn test_short_retry_initial_http_requests() {
3766        let server = MatrixMockServer::new().await;
3767        let client = server
3768            .client_builder()
3769            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3770            .build()
3771            .await;
3772
3773        server.mock_login().error500().expect(3..).mount().await;
3774
3775        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3776    }
3777
3778    #[async_test]
3779    async fn test_no_retry_http_requests() {
3780        let server = MatrixMockServer::new().await;
3781        let client = server.client_builder().build().await;
3782
3783        server.mock_devices().error500().mock_once().mount().await;
3784
3785        client.devices().await.unwrap_err();
3786    }
3787
3788    #[async_test]
3789    async fn test_set_homeserver() {
3790        let client = MockClientBuilder::new(None).build().await;
3791        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3792
3793        let homeserver = Url::parse("http://example.com/").unwrap();
3794        client.set_homeserver(homeserver.clone());
3795        assert_eq!(client.homeserver(), homeserver);
3796    }
3797
3798    #[async_test]
3799    async fn test_search_user_request() {
3800        let server = MatrixMockServer::new().await;
3801        let client = server.client_builder().build().await;
3802
3803        server.mock_user_directory().ok().mock_once().mount().await;
3804
3805        let response = client.search_users("test", 50).await.unwrap();
3806        assert_eq!(response.results.len(), 1);
3807        let result = &response.results[0];
3808        assert_eq!(result.user_id.to_string(), "@test:example.me");
3809        assert_eq!(result.display_name.clone().unwrap(), "Test");
3810        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3811        assert!(!response.limited);
3812    }
3813
3814    #[async_test]
3815    async fn test_request_unstable_features() {
3816        let server = MatrixMockServer::new().await;
3817        let client = server.client_builder().no_server_versions().build().await;
3818
3819        server
3820            .mock_versions()
3821            .with_feature("org.matrix.e2e_cross_signing", true)
3822            .ok()
3823            .mock_once()
3824            .mount()
3825            .await;
3826
3827        let unstable_features = client.unstable_features().await.unwrap();
3828        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3829        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3830    }
3831
3832    #[async_test]
3833    async fn test_can_homeserver_push_encrypted_event_to_device() {
3834        let server = MatrixMockServer::new().await;
3835        let client = server.client_builder().no_server_versions().build().await;
3836
3837        server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3838
3839        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3840        assert!(msc4028_enabled);
3841    }
3842
3843    #[async_test]
3844    async fn test_recently_visited_rooms() {
3845        // Tracking recently visited rooms requires authentication
3846        let client = MockClientBuilder::new(None).unlogged().build().await;
3847        assert_matches!(
3848            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3849            Err(Error::AuthenticationRequired)
3850        );
3851
3852        let client = MockClientBuilder::new(None).build().await;
3853        let account = client.account();
3854
3855        // We should start off with an empty list
3856        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3857
3858        // Tracking a valid room id should add it to the list
3859        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3860        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3861        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3862
3863        // And the existing list shouldn't be changed
3864        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3865        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3866
3867        // Tracking the same room again shouldn't change the list
3868        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3869        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3870        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3871
3872        // Tracking a second room should add it to the front of the list
3873        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3874        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3875        assert_eq!(
3876            account.get_recently_visited_rooms().await.unwrap(),
3877            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3878        );
3879
3880        // Tracking the first room yet again should move it to the front of the list
3881        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3882        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3883        assert_eq!(
3884            account.get_recently_visited_rooms().await.unwrap(),
3885            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3886        );
3887
3888        // Tracking should be capped at 20
3889        for n in 0..20 {
3890            account
3891                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3892                .await
3893                .unwrap();
3894        }
3895
3896        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3897
3898        // And the initial rooms should've been pushed out
3899        let rooms = account.get_recently_visited_rooms().await.unwrap();
3900        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3901        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3902
3903        // And the last tracked room should be the first
3904        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3905    }
3906
3907    #[async_test]
3908    async fn test_client_no_cycle_with_event_cache() {
3909        let client = MockClientBuilder::new(None).build().await;
3910
3911        // Wait for the init tasks to die.
3912        sleep(Duration::from_secs(1)).await;
3913
3914        let weak_client = WeakClient::from_client(&client);
3915        assert_eq!(weak_client.strong_count(), 1);
3916
3917        {
3918            let room_id = room_id!("!room:example.org");
3919
3920            // Have the client know the room.
3921            let response = SyncResponseBuilder::default()
3922                .add_joined_room(JoinedRoomBuilder::new(room_id))
3923                .build_sync_response();
3924            client.inner.base_client.receive_sync_response(response).await.unwrap();
3925
3926            client.event_cache().subscribe().unwrap();
3927
3928            let (_room_event_cache, _drop_handles) =
3929                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3930        }
3931
3932        drop(client);
3933
3934        // Give a bit of time for background tasks to die.
3935        sleep(Duration::from_secs(1)).await;
3936
3937        // The weak client must be the last reference to the client now.
3938        assert_eq!(weak_client.strong_count(), 0);
3939        let client = weak_client.get();
3940        assert!(
3941            client.is_none(),
3942            "too many strong references to the client: {}",
3943            Arc::strong_count(&client.unwrap().inner)
3944        );
3945    }
3946
3947    #[async_test]
3948    async fn test_supported_versions_caching() {
3949        let server = MatrixMockServer::new().await;
3950
3951        let versions_mock = server
3952            .mock_versions()
3953            .expect_default_access_token()
3954            .with_feature("org.matrix.e2e_cross_signing", true)
3955            .ok()
3956            .named("first versions mock")
3957            .expect(1)
3958            .mount_as_scoped()
3959            .await;
3960
3961        let memory_store = Arc::new(MemoryStore::new());
3962        let client = server
3963            .client_builder()
3964            .no_server_versions()
3965            .on_builder(|builder| {
3966                builder.store_config(
3967                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3968                        .state_store(memory_store.clone()),
3969                )
3970            })
3971            .build()
3972            .await;
3973
3974        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3975
3976        // The result was cached.
3977        assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3978        // This subsequent call hits the in-memory cache.
3979        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3980
3981        drop(client);
3982
3983        let client = server
3984            .client_builder()
3985            .no_server_versions()
3986            .on_builder(|builder| {
3987                builder.store_config(
3988                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3989                        .state_store(memory_store.clone()),
3990                )
3991            })
3992            .build()
3993            .await;
3994
3995        // These calls to the new client hit the on-disk cache.
3996        assert!(
3997            client
3998                .unstable_features()
3999                .await
4000                .unwrap()
4001                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
4002        );
4003
4004        let supported = client.supported_versions().await.unwrap();
4005        assert!(supported.versions.contains(&MatrixVersion::V1_0));
4006        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4007
4008        // Then this call hits the in-memory cache.
4009        let supported = client.supported_versions().await.unwrap();
4010        assert!(supported.versions.contains(&MatrixVersion::V1_0));
4011        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4012
4013        drop(versions_mock);
4014
4015        // Now, reset the cache, and observe the endpoint being called again once.
4016        client.reset_supported_versions().await.unwrap();
4017
4018        server.mock_versions().ok().expect(2).named("second versions mock").mount().await;
4019
4020        // Hits network again.
4021        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4022        // Hits in-memory cache again.
4023        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4024        assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4025
4026        // Force an expiry of the data.
4027        let supported_versions = client.supported_versions_cached().await.unwrap().unwrap();
4028        let mut ttl_value = TtlValue::new(supported_versions);
4029        ttl_value.expire();
4030        client.inner.caches.supported_versions.set_value(ttl_value);
4031
4032        // Call the method to trigger a cache refresh background task.
4033        client.supported_versions_cached().await.unwrap().unwrap();
4034
4035        // We wait for the task to finish, the endpoint should have been called again.
4036        sleep(Duration::from_secs(1)).await;
4037        assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4038    }
4039
4040    #[async_test]
4041    async fn test_well_known_caching() {
4042        let server = MatrixMockServer::new().await;
4043        let server_url = server.uri();
4044        let domain = server_url.strip_prefix("http://").unwrap();
4045        let server_name = <&ServerName>::try_from(domain).unwrap();
4046        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
4047
4048        let well_known_mock = server
4049            .mock_well_known()
4050            .ok()
4051            .named("well known mock")
4052            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
4053            .mount_as_scoped()
4054            .await;
4055
4056        let memory_store = Arc::new(MemoryStore::new());
4057        let client = Client::builder()
4058            .insecure_server_name_no_tls(server_name)
4059            .store_config(
4060                StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4061                    .state_store(memory_store.clone()),
4062            )
4063            .build()
4064            .await
4065            .unwrap();
4066
4067        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4068
4069        // This subsequent call hits the in-memory cache.
4070        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4071
4072        drop(client);
4073
4074        let client = server
4075            .client_builder()
4076            .no_server_versions()
4077            .on_builder(|builder| {
4078                builder.store_config(
4079                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4080                        .state_store(memory_store.clone()),
4081                )
4082            })
4083            .build()
4084            .await;
4085
4086        // This call to the new client hits the on-disk cache.
4087        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4088
4089        // Then this call hits the in-memory cache.
4090        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4091
4092        drop(well_known_mock);
4093
4094        // Now, reset the cache, and observe the endpoints being called again once.
4095        client.reset_well_known().await.unwrap();
4096
4097        server.mock_well_known().ok().named("second well known mock").expect(2).mount().await;
4098
4099        // Hits network again.
4100        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4101        // Hits in-memory cache again.
4102        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4103
4104        // Force an expiry of the data.
4105        let well_known = client.well_known().await;
4106        let mut ttl_value = TtlValue::new(well_known);
4107        ttl_value.expire();
4108        client.inner.caches.well_known.set_value(ttl_value);
4109
4110        // Call the method again to trigger a cache refresh background task.
4111        client.well_known().await;
4112
4113        // We wait for the task to finish, the endpoint should have been called again.
4114        // We need to wait a bit because the first requests using the server name of the
4115        // user will fail, only the requests using the homeserver URL will succeed.
4116        sleep(Duration::from_secs(5)).await;
4117        assert_matches!(client.inner.caches.well_known.value(), CachedValue::Cached(value) if !value.has_expired());
4118    }
4119
4120    #[async_test]
4121    async fn test_missing_well_known_caching() {
4122        let server = MatrixMockServer::new().await;
4123        let rtc_foci: Vec<RtcFocusInfo> = vec![];
4124
4125        let well_known_mock = server
4126            .mock_well_known()
4127            .error_unrecognized()
4128            .named("first well-known mock")
4129            .expect(1)
4130            .mount_as_scoped()
4131            .await;
4132
4133        let memory_store = Arc::new(MemoryStore::new());
4134        let client = server
4135            .client_builder()
4136            .on_builder(|builder| {
4137                builder.store_config(
4138                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4139                        .state_store(memory_store.clone()),
4140                )
4141            })
4142            .build()
4143            .await;
4144
4145        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4146
4147        // This subsequent call hits the in-memory cache.
4148        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4149
4150        drop(client);
4151
4152        let client = server
4153            .client_builder()
4154            .on_builder(|builder| {
4155                builder.store_config(
4156                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4157                        .state_store(memory_store.clone()),
4158                )
4159            })
4160            .build()
4161            .await;
4162
4163        // This call to the new client hits the on-disk cache.
4164        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4165
4166        // Then this call hits the in-memory cache.
4167        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4168
4169        drop(well_known_mock);
4170
4171        // Now, reset the cache, and observe the endpoints being called again once.
4172        client.reset_well_known().await.unwrap();
4173
4174        server
4175            .mock_well_known()
4176            .error_unrecognized()
4177            .expect(1)
4178            .named("second well-known mock")
4179            .mount()
4180            .await;
4181
4182        // Hits network again.
4183        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4184        // Hits in-memory cache again.
4185        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4186    }
4187
4188    #[async_test]
4189    async fn test_no_network_doesnt_cause_infinite_retries() {
4190        // We want infinite retries for transient errors.
4191        let client = MockClientBuilder::new(None)
4192            .on_builder(|builder| builder.request_config(RequestConfig::new()))
4193            .build()
4194            .await;
4195
4196        // We don't define a mock server on purpose here, so that the error is really a
4197        // network error.
4198        client.whoami().await.unwrap_err();
4199    }
4200
4201    #[async_test]
4202    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4203        let server = MatrixMockServer::new().await;
4204        let client = server.client_builder().build().await;
4205
4206        let room_id = room_id!("!room:example.org");
4207
4208        server
4209            .mock_sync()
4210            .ok_and_run(&client, |builder| {
4211                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4212            })
4213            .await;
4214
4215        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4216        assert_eq!(room.room_id(), room_id);
4217    }
4218
4219    #[async_test]
4220    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4221        let server = MatrixMockServer::new().await;
4222        let client = server.client_builder().build().await;
4223
4224        let room_id = room_id!("!room:example.org");
4225
4226        let client = Arc::new(client);
4227
4228        // Perform the /sync request with a delay so it starts after the
4229        // `await_room_remote_echo` call has happened
4230        spawn({
4231            let client = client.clone();
4232            async move {
4233                sleep(Duration::from_millis(100)).await;
4234
4235                server
4236                    .mock_sync()
4237                    .ok_and_run(&client, |builder| {
4238                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4239                    })
4240                    .await;
4241            }
4242        });
4243
4244        let room =
4245            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4246        assert_eq!(room.room_id(), room_id);
4247    }
4248
4249    #[async_test]
4250    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4251        let client = MockClientBuilder::new(None).build().await;
4252
4253        let room_id = room_id!("!room:example.org");
4254        // Room is not present so the client won't be able to find it. The call will
4255        // timeout.
4256        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4257    }
4258
4259    #[async_test]
4260    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4261        let server = MatrixMockServer::new().await;
4262        let client = server.client_builder().build().await;
4263
4264        server.mock_create_room().ok().mount().await;
4265
4266        // Create a room in the internal store
4267        let room = client
4268            .create_room(assign!(CreateRoomRequest::new(), {
4269                invite: vec![],
4270                is_direct: false,
4271            }))
4272            .await
4273            .unwrap();
4274
4275        // Room is locally present, but not synced, the call will timeout
4276        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4277            .await
4278            .unwrap_err();
4279    }
4280
4281    #[async_test]
4282    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4283        let server = MatrixMockServer::new().await;
4284        let client = server.client_builder().build().await;
4285
4286        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4287
4288        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4289        assert_matches!(ret, Ok(true));
4290    }
4291
4292    #[async_test]
4293    async fn test_is_room_alias_available_if_alias_is_resolved() {
4294        let server = MatrixMockServer::new().await;
4295        let client = server.client_builder().build().await;
4296
4297        server
4298            .mock_room_directory_resolve_alias()
4299            .ok("!some_room_id:matrix.org", Vec::new())
4300            .expect(1)
4301            .mount()
4302            .await;
4303
4304        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4305        assert_matches!(ret, Ok(false));
4306    }
4307
4308    #[async_test]
4309    async fn test_is_room_alias_available_if_error_found() {
4310        let server = MatrixMockServer::new().await;
4311        let client = server.client_builder().build().await;
4312
4313        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4314
4315        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4316        assert_matches!(ret, Err(_));
4317    }
4318
4319    #[async_test]
4320    async fn test_create_room_alias() {
4321        let server = MatrixMockServer::new().await;
4322        let client = server.client_builder().build().await;
4323
4324        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4325
4326        let ret = client
4327            .create_room_alias(
4328                room_alias_id!("#some_alias:matrix.org"),
4329                room_id!("!some_room:matrix.org"),
4330            )
4331            .await;
4332        assert_matches!(ret, Ok(()));
4333    }
4334
4335    #[async_test]
4336    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4337        let server = MatrixMockServer::new().await;
4338        let client = server.client_builder().build().await;
4339
4340        let room_id = room_id!("!a-room:matrix.org");
4341
4342        // Make sure the summary endpoint is called once
4343        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4344
4345        // We create a locally cached invited room
4346        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4347
4348        // And we get a preview, the server endpoint was reached
4349        let preview = client
4350            .get_room_preview(room_id.into(), Vec::new())
4351            .await
4352            .expect("Room preview should be retrieved");
4353
4354        assert_eq!(invited_room.room_id(), preview.room_id);
4355    }
4356
4357    #[async_test]
4358    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4359        let server = MatrixMockServer::new().await;
4360        let client = server.client_builder().build().await;
4361
4362        let room_id = room_id!("!a-room:matrix.org");
4363
4364        // Make sure the summary endpoint is called once
4365        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4366
4367        // We create a locally cached left room
4368        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4369
4370        // And we get a preview, the server endpoint was reached
4371        let preview = client
4372            .get_room_preview(room_id.into(), Vec::new())
4373            .await
4374            .expect("Room preview should be retrieved");
4375
4376        assert_eq!(left_room.room_id(), preview.room_id);
4377    }
4378
4379    #[async_test]
4380    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4381        let server = MatrixMockServer::new().await;
4382        let client = server.client_builder().build().await;
4383
4384        let room_id = room_id!("!a-room:matrix.org");
4385
4386        // Make sure the summary endpoint is called once
4387        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4388
4389        // We create a locally cached knocked room
4390        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4391
4392        // And we get a preview, the server endpoint was reached
4393        let preview = client
4394            .get_room_preview(room_id.into(), Vec::new())
4395            .await
4396            .expect("Room preview should be retrieved");
4397
4398        assert_eq!(knocked_room.room_id(), preview.room_id);
4399    }
4400
4401    #[async_test]
4402    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4403        let server = MatrixMockServer::new().await;
4404        let client = server.client_builder().build().await;
4405
4406        let room_id = room_id!("!a-room:matrix.org");
4407
4408        // Make sure the summary endpoint is not called
4409        server.mock_room_summary().ok(room_id).never().mount().await;
4410
4411        // We create a locally cached joined room
4412        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4413
4414        // And we get a preview, no server endpoint was reached
4415        let preview = client
4416            .get_room_preview(room_id.into(), Vec::new())
4417            .await
4418            .expect("Room preview should be retrieved");
4419
4420        assert_eq!(joined_room.room_id(), preview.room_id);
4421    }
4422
4423    #[async_test]
4424    async fn test_media_preview_config() {
4425        let server = MatrixMockServer::new().await;
4426        let client = server.client_builder().build().await;
4427
4428        server
4429            .mock_sync()
4430            .ok_and_run(&client, |builder| {
4431                builder.add_custom_global_account_data(json!({
4432                    "content": {
4433                        "media_previews": "private",
4434                        "invite_avatars": "off"
4435                    },
4436                    "type": "m.media_preview_config"
4437                }));
4438            })
4439            .await;
4440
4441        let (initial_value, stream) =
4442            client.account().observe_media_preview_config().await.unwrap();
4443
4444        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4445        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4446        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4447        pin_mut!(stream);
4448        assert_pending!(stream);
4449
4450        server
4451            .mock_sync()
4452            .ok_and_run(&client, |builder| {
4453                builder.add_custom_global_account_data(json!({
4454                    "content": {
4455                        "media_previews": "off",
4456                        "invite_avatars": "on"
4457                    },
4458                    "type": "m.media_preview_config"
4459                }));
4460            })
4461            .await;
4462
4463        assert_next_matches!(
4464            stream,
4465            MediaPreviewConfigEventContent {
4466                media_previews: Some(MediaPreviews::Off),
4467                invite_avatars: Some(InviteAvatars::On),
4468                ..
4469            }
4470        );
4471        assert_pending!(stream);
4472    }
4473
4474    #[async_test]
4475    async fn test_unstable_media_preview_config() {
4476        let server = MatrixMockServer::new().await;
4477        let client = server.client_builder().build().await;
4478
4479        server
4480            .mock_sync()
4481            .ok_and_run(&client, |builder| {
4482                builder.add_custom_global_account_data(json!({
4483                    "content": {
4484                        "media_previews": "private",
4485                        "invite_avatars": "off"
4486                    },
4487                    "type": "io.element.msc4278.media_preview_config"
4488                }));
4489            })
4490            .await;
4491
4492        let (initial_value, stream) =
4493            client.account().observe_media_preview_config().await.unwrap();
4494
4495        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4496        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4497        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4498        pin_mut!(stream);
4499        assert_pending!(stream);
4500
4501        server
4502            .mock_sync()
4503            .ok_and_run(&client, |builder| {
4504                builder.add_custom_global_account_data(json!({
4505                    "content": {
4506                        "media_previews": "off",
4507                        "invite_avatars": "on"
4508                    },
4509                    "type": "io.element.msc4278.media_preview_config"
4510                }));
4511            })
4512            .await;
4513
4514        assert_next_matches!(
4515            stream,
4516            MediaPreviewConfigEventContent {
4517                media_previews: Some(MediaPreviews::Off),
4518                invite_avatars: Some(InviteAvatars::On),
4519                ..
4520            }
4521        );
4522        assert_pending!(stream);
4523    }
4524
4525    #[async_test]
4526    async fn test_media_preview_config_not_found() {
4527        let server = MatrixMockServer::new().await;
4528        let client = server.client_builder().build().await;
4529
4530        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4531
4532        assert!(initial_value.is_none());
4533    }
4534
4535    #[async_test]
4536    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4537        // The default Matrix version we use is 1.11 or higher, so authenticated media
4538        // is supported.
4539        let server = MatrixMockServer::new().await;
4540        let client = server.client_builder().build().await;
4541
4542        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4543
4544        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4545        client.load_or_fetch_max_upload_size().await.unwrap();
4546
4547        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4548    }
4549
4550    #[async_test]
4551    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4552        // The server must advertise support for the stable feature for authenticated
4553        // media support, so we mock the `GET /versions` response.
4554        let server = MatrixMockServer::new().await;
4555        let client = server.client_builder().no_server_versions().build().await;
4556
4557        server
4558            .mock_versions()
4559            .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4560            .with_feature("org.matrix.msc3916.stable", true)
4561            .ok()
4562            .named("versions")
4563            .expect(1)
4564            .mount()
4565            .await;
4566
4567        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4568
4569        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4570        client.load_or_fetch_max_upload_size().await.unwrap();
4571
4572        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4573    }
4574
4575    #[async_test]
4576    async fn test_load_or_fetch_max_upload_size_no_auth() {
4577        // The server must not support Matrix 1.11 or higher for unauthenticated
4578        // media requests, so we mock the `GET /versions` response.
4579        let server = MatrixMockServer::new().await;
4580        let client = server.client_builder().no_server_versions().build().await;
4581
4582        server
4583            .mock_versions()
4584            .with_versions(vec!["v1.1"])
4585            .ok()
4586            .named("versions")
4587            .expect(1)
4588            .mount()
4589            .await;
4590
4591        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4592
4593        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4594        client.load_or_fetch_max_upload_size().await.unwrap();
4595
4596        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4597    }
4598
4599    #[async_test]
4600    async fn test_uploading_a_too_large_media_file() {
4601        let server = MatrixMockServer::new().await;
4602        let client = server.client_builder().build().await;
4603
4604        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4605        client.load_or_fetch_max_upload_size().await.unwrap();
4606        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4607
4608        let data = vec![1, 2];
4609        let upload_request =
4610            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4611        let request = SendRequest {
4612            client: client.clone(),
4613            request: upload_request,
4614            config: None,
4615            send_progress: SharedObservable::new(TransmissionProgress::default()),
4616        };
4617        let media_request = SendMediaUploadRequest::new(request);
4618
4619        let error = media_request.await.err();
4620        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4621        assert_eq!(max, uint!(1));
4622        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4623    }
4624
4625    #[async_test]
4626    async fn test_dont_ignore_timeout_on_first_sync() {
4627        let server = MatrixMockServer::new().await;
4628        let client = server.client_builder().build().await;
4629
4630        server
4631            .mock_sync()
4632            .timeout(Some(Duration::from_secs(30)))
4633            .ok(|_| {})
4634            .mock_once()
4635            .named("sync_with_timeout")
4636            .mount()
4637            .await;
4638
4639        // Call the endpoint once to check the timeout.
4640        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4641
4642        timeout(Duration::from_secs(1), async {
4643            stream.next().await.unwrap().unwrap();
4644        })
4645        .await
4646        .unwrap();
4647    }
4648
4649    #[async_test]
4650    async fn test_ignore_timeout_on_first_sync() {
4651        let server = MatrixMockServer::new().await;
4652        let client = server.client_builder().build().await;
4653
4654        server
4655            .mock_sync()
4656            .timeout(None)
4657            .ok(|_| {})
4658            .mock_once()
4659            .named("sync_no_timeout")
4660            .mount()
4661            .await;
4662        server
4663            .mock_sync()
4664            .timeout(Some(Duration::from_secs(30)))
4665            .ok(|_| {})
4666            .mock_once()
4667            .named("sync_with_timeout")
4668            .mount()
4669            .await;
4670
4671        // Call each version of the endpoint once to check the timeouts.
4672        let mut stream = Box::pin(
4673            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4674        );
4675
4676        timeout(Duration::from_secs(1), async {
4677            stream.next().await.unwrap().unwrap();
4678            stream.next().await.unwrap().unwrap();
4679        })
4680        .await
4681        .unwrap();
4682    }
4683
4684    #[async_test]
4685    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4686        let server = MatrixMockServer::new().await;
4687        let client = server.client_builder().build().await;
4688        // This is the user ID that is inside MemberAdditional.
4689        // Note the confusing username, so we can share
4690        // GlobalAccountDataTestEvent::Direct with the invited test.
4691        let user_id = user_id!("@invited:localhost");
4692
4693        // When we receive a sync response saying "invited" is invited to a DM
4694        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4695        let response = SyncResponseBuilder::default()
4696            .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4697            .add_global_account_data(
4698                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4699            )
4700            .build_sync_response();
4701        client.base_client().receive_sync_response(response).await.unwrap();
4702
4703        // Then get_dm_room finds this room
4704        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4705        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4706    }
4707
4708    #[async_test]
4709    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4710        let server = MatrixMockServer::new().await;
4711        let client = server.client_builder().build().await;
4712        // This is the user ID that is inside MemberInvite
4713        let user_id = user_id!("@invited:localhost");
4714
4715        // When we receive a sync response saying "invited" is invited to a DM
4716        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4717        let response = SyncResponseBuilder::default()
4718            .add_joined_room(
4719                JoinedRoomBuilder::default()
4720                    .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4721            )
4722            .add_global_account_data(
4723                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4724            )
4725            .build_sync_response();
4726        client.base_client().receive_sync_response(response).await.unwrap();
4727
4728        // Then get_dm_room finds this room
4729        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4730        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4731    }
4732
4733    #[async_test]
4734    async fn test_get_dm_room_still_finds_left_room() {
4735        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4736        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4737
4738        let server = MatrixMockServer::new().await;
4739        let client = server.client_builder().build().await;
4740        // This is the user ID that is inside MemberAdditional.
4741        // Note the confusing username, so we can share
4742        // GlobalAccountDataTestEvent::Direct with the invited test.
4743        let user_id = user_id!("@invited:localhost");
4744
4745        // When we receive a sync response saying "invited" has left a DM
4746        let f = EventFactory::new().sender(user_id);
4747        let response = SyncResponseBuilder::default()
4748            .add_joined_room(
4749                JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4750            )
4751            .add_global_account_data(
4752                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4753            )
4754            .build_sync_response();
4755        client.base_client().receive_sync_response(response).await.unwrap();
4756
4757        // Then get_dm_room finds this room
4758        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4759        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4760    }
4761
4762    #[async_test]
4763    async fn test_device_exists() {
4764        let server = MatrixMockServer::new().await;
4765        let client = server.client_builder().build().await;
4766
4767        server.mock_get_device().ok().expect(1).mount().await;
4768
4769        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4770    }
4771
4772    #[async_test]
4773    async fn test_device_exists_404() {
4774        let server = MatrixMockServer::new().await;
4775        let client = server.client_builder().build().await;
4776
4777        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4778    }
4779
4780    #[async_test]
4781    async fn test_device_exists_500() {
4782        let server = MatrixMockServer::new().await;
4783        let client = server.client_builder().build().await;
4784
4785        server.mock_get_device().error500().expect(1).mount().await;
4786
4787        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4788    }
4789
4790    #[async_test]
4791    async fn test_fetching_well_known_with_homeserver_url() {
4792        let server = MatrixMockServer::new().await;
4793        let client = server.client_builder().build().await;
4794        server.mock_well_known().ok().mount().await;
4795
4796        assert_matches!(client.fetch_client_well_known().await, Some(_));
4797    }
4798
4799    #[async_test]
4800    async fn test_fetching_well_known_with_server_name() {
4801        let server = MatrixMockServer::new().await;
4802        let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4803
4804        server.mock_well_known().ok().mount().await;
4805
4806        let client = MockClientBuilder::new(None)
4807            .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4808            .build()
4809            .await;
4810
4811        assert_matches!(client.fetch_client_well_known().await, Some(_));
4812    }
4813
4814    #[async_test]
4815    async fn test_fetching_well_known_with_domain_part_of_user_id() {
4816        let server = MatrixMockServer::new().await;
4817        server.mock_well_known().ok().mount().await;
4818
4819        let user_id =
4820            UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4821        let client = MockClientBuilder::new(None)
4822            .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4823            .build()
4824            .await;
4825
4826        assert_matches!(client.fetch_client_well_known().await, Some(_));
4827    }
4828}