matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
33use matrix_sdk_base::{
34    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
35    StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
36    event_cache::store::EventCacheStoreLock,
37    media::store::MediaStoreLock,
38    store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
39    sync::{Notification, RoomUpdates},
40};
41use matrix_sdk_common::ttl_cache::TtlCache;
42#[cfg(feature = "e2e-encryption")]
43use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
44use ruma::{
45    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
46    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
47    api::{
48        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
49        auth_scheme::{AuthScheme, SendAccessToken},
50        client::{
51            account::whoami,
52            alias::{create_alias, delete_alias, get_alias},
53            authenticated_media,
54            device::{self, delete_devices, get_devices, update_device},
55            directory::{get_public_rooms, get_public_rooms_filtered},
56            discovery::{
57                discover_homeserver::{self, RtcFocusInfo},
58                get_capabilities::{self, v3::Capabilities},
59                get_supported_versions,
60            },
61            error::ErrorKind,
62            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
63            knock::knock_room,
64            media,
65            membership::{join_room_by_id, join_room_by_id_or_alias},
66            room::create_room,
67            session::login::v3::DiscoveryInfo,
68            sync::sync_events,
69            threads::get_thread_subscriptions_changes,
70            uiaa,
71            user_directory::search_users,
72        },
73        error::FromHttpResponseError,
74        federation::discovery::get_server_version,
75        path_builder::PathBuilder,
76    },
77    assign,
78    events::direct::DirectUserIdentifier,
79    push::Ruleset,
80    time::Instant,
81};
82use serde::de::DeserializeOwned;
83use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
84use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
85use url::Url;
86
87use self::futures::SendRequest;
88use crate::{
89    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
90    Room, SessionTokens, TransmissionProgress,
91    authentication::{
92        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
93        oauth::OAuth,
94    },
95    client::thread_subscriptions::ThreadSubscriptionCatchup,
96    config::{RequestConfig, SyncToken},
97    deduplicating_handler::DeduplicatingHandler,
98    error::HttpResult,
99    event_cache::EventCache,
100    event_handler::{
101        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
102        EventHandlerStore, ObservableEventHandler, SyncEvent,
103    },
104    http_client::{HttpClient, SupportedPathBuilder},
105    latest_events::LatestEvents,
106    media::MediaError,
107    notification_settings::NotificationSettings,
108    room::RoomMember,
109    room_preview::RoomPreview,
110    send_queue::{SendQueue, SendQueueData},
111    sliding_sync::Version as SlidingSyncVersion,
112    sync::{RoomUpdate, SyncResponse},
113};
114#[cfg(feature = "e2e-encryption")]
115use crate::{
116    cross_process_lock::CrossProcessLock,
117    encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
118};
119
120mod builder;
121pub(crate) mod caches;
122pub(crate) mod futures;
123pub(crate) mod thread_subscriptions;
124
125pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
126#[cfg(feature = "experimental-search")]
127use crate::search_index::SearchIndex;
128
129#[cfg(not(target_family = "wasm"))]
130type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
131#[cfg(target_family = "wasm")]
132type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
133
134#[cfg(not(target_family = "wasm"))]
135type NotificationHandlerFn =
136    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
137#[cfg(target_family = "wasm")]
138type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
139
140/// Enum controlling if a loop running callbacks should continue or abort.
141///
142/// This is mainly used in the [`sync_with_callback`] method, the return value
143/// of the provided callback controls if the sync loop should be exited.
144///
145/// [`sync_with_callback`]: #method.sync_with_callback
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub enum LoopCtrl {
148    /// Continue running the loop.
149    Continue,
150    /// Break out of the loop.
151    Break,
152}
153
154/// Represents changes that can occur to a `Client`s `Session`.
155#[derive(Debug, Clone, PartialEq)]
156pub enum SessionChange {
157    /// The session's token is no longer valid.
158    UnknownToken {
159        /// Whether or not the session was soft logged out
160        soft_logout: bool,
161    },
162    /// The session's tokens have been refreshed.
163    TokensRefreshed,
164}
165
166/// Information about the server vendor obtained from the federation API.
167#[derive(Debug, Clone, PartialEq, Eq)]
168#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
169pub struct ServerVendorInfo {
170    /// The server name.
171    pub server_name: String,
172    /// The server version.
173    pub version: String,
174}
175
176/// An async/await enabled Matrix client.
177///
178/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
179#[derive(Clone)]
180pub struct Client {
181    pub(crate) inner: Arc<ClientInner>,
182}
183
184#[derive(Default)]
185pub(crate) struct ClientLocks {
186    /// Lock ensuring that only a single room may be marked as a DM at once.
187    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
188    /// explanation.
189    pub(crate) mark_as_dm_lock: Mutex<()>,
190
191    /// Lock ensuring that only a single secret store is getting opened at the
192    /// same time.
193    ///
194    /// This is important so we don't accidentally create multiple different new
195    /// default secret storage keys.
196    #[cfg(feature = "e2e-encryption")]
197    pub(crate) open_secret_store_lock: Mutex<()>,
198
199    /// Lock ensuring that we're only storing a single secret at a time.
200    ///
201    /// Take a look at the [`SecretStore::put_secret`] method for a more
202    /// detailed explanation.
203    ///
204    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
205    #[cfg(feature = "e2e-encryption")]
206    pub(crate) store_secret_lock: Mutex<()>,
207
208    /// Lock ensuring that only one method at a time might modify our backup.
209    #[cfg(feature = "e2e-encryption")]
210    pub(crate) backup_modify_lock: Mutex<()>,
211
212    /// Lock ensuring that we're going to attempt to upload backups for a single
213    /// requester.
214    #[cfg(feature = "e2e-encryption")]
215    pub(crate) backup_upload_lock: Mutex<()>,
216
217    /// Handler making sure we only have one group session sharing request in
218    /// flight per room.
219    #[cfg(feature = "e2e-encryption")]
220    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
221
222    /// Lock making sure we're only doing one key claim request at a time.
223    #[cfg(feature = "e2e-encryption")]
224    pub(crate) key_claim_lock: Mutex<()>,
225
226    /// Handler to ensure that only one members request is running at a time,
227    /// given a room.
228    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
229
230    /// Handler to ensure that only one encryption state request is running at a
231    /// time, given a room.
232    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
233
234    /// Deduplicating handler for sending read receipts. The string is an
235    /// internal implementation detail, see [`Self::send_single_receipt`].
236    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
237
238    #[cfg(feature = "e2e-encryption")]
239    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
240
241    /// Latest "generation" of data known by the crypto store.
242    ///
243    /// This is a counter that only increments, set in the database (and can
244    /// wrap). It's incremented whenever some process acquires a lock for the
245    /// first time. *This assumes the crypto store lock is being held, to
246    /// avoid data races on writing to this value in the store*.
247    ///
248    /// The current process will maintain this value in local memory and in the
249    /// DB over time. Observing a different value than the one read in
250    /// memory, when reading from the store indicates that somebody else has
251    /// written into the database under our feet.
252    ///
253    /// TODO: this should live in the `OlmMachine`, since it's information
254    /// related to the lock. As of today (2023-07-28), we blow up the entire
255    /// olm machine when there's a generation mismatch. So storing the
256    /// generation in the olm machine would make the client think there's
257    /// *always* a mismatch, and that's why we need to store the generation
258    /// outside the `OlmMachine`.
259    #[cfg(feature = "e2e-encryption")]
260    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
261}
262
263pub(crate) struct ClientInner {
264    /// All the data related to authentication and authorization.
265    pub(crate) auth_ctx: Arc<AuthCtx>,
266
267    /// The URL of the server.
268    ///
269    /// Not to be confused with the `Self::homeserver`. `server` is usually
270    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
271    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
272    /// homeserver (at the time of writing — 2024-08-28).
273    ///
274    /// This value is optional depending on how the `Client` has been built.
275    /// If it's been built from a homeserver URL directly, we don't know the
276    /// server. However, if the `Client` has been built from a server URL or
277    /// name, then the homeserver has been discovered, and we know both.
278    server: Option<Url>,
279
280    /// The URL of the homeserver to connect to.
281    ///
282    /// This is the URL for the client-server Matrix API.
283    homeserver: StdRwLock<Url>,
284
285    /// The sliding sync version.
286    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
287
288    /// The underlying HTTP client.
289    pub(crate) http_client: HttpClient,
290
291    /// User session data.
292    pub(super) base_client: BaseClient,
293
294    /// Collection of in-memory caches for the [`Client`].
295    pub(crate) caches: ClientCaches,
296
297    /// Collection of locks individual client methods might want to use, either
298    /// to ensure that only a single call to a method happens at once or to
299    /// deduplicate multiple calls to a method.
300    pub(crate) locks: ClientLocks,
301
302    /// The cross-process store locks holder name.
303    ///
304    /// The SDK provides cross-process store locks (see
305    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
306    /// `holder_name` is the value used for all cross-process store locks
307    /// used by this `Client`.
308    ///
309    /// If multiple `Client`s are running in different processes, this
310    /// value MUST be different for each `Client`.
311    cross_process_store_locks_holder_name: String,
312
313    /// A mapping of the times at which the current user sent typing notices,
314    /// keyed by room.
315    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
316
317    /// Event handlers. See `add_event_handler`.
318    pub(crate) event_handlers: EventHandlerStore,
319
320    /// Notification handlers. See `register_notification_handler`.
321    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
322
323    /// The sender-side of channels used to receive room updates.
324    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
325
326    /// The sender-side of a channel used to observe all the room updates of a
327    /// sync response.
328    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
329
330    /// Whether the client should update its homeserver URL with the discovery
331    /// information present in the login response.
332    respect_login_well_known: bool,
333
334    /// An event that can be listened on to wait for a successful sync. The
335    /// event will only be fired if a sync loop is running. Can be used for
336    /// synchronization, e.g. if we send out a request to create a room, we can
337    /// wait for the sync to get the data to fetch a room object from the state
338    /// store.
339    pub(crate) sync_beat: event_listener::Event,
340
341    /// A central cache for events, inactive first.
342    ///
343    /// It becomes active when [`EventCache::subscribe`] is called.
344    pub(crate) event_cache: OnceCell<EventCache>,
345
346    /// End-to-end encryption related state.
347    #[cfg(feature = "e2e-encryption")]
348    pub(crate) e2ee: EncryptionData,
349
350    /// The verification state of our own device.
351    #[cfg(feature = "e2e-encryption")]
352    pub(crate) verification_state: SharedObservable<VerificationState>,
353
354    /// Whether to enable the experimental support for sending and receiving
355    /// encrypted room history on invite, per [MSC4268].
356    ///
357    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
358    #[cfg(feature = "e2e-encryption")]
359    pub(crate) enable_share_history_on_invite: bool,
360
361    /// Data related to the [`SendQueue`].
362    ///
363    /// [`SendQueue`]: crate::send_queue::SendQueue
364    pub(crate) send_queue_data: Arc<SendQueueData>,
365
366    /// The `max_upload_size` value of the homeserver, it contains the max
367    /// request size you can send.
368    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
369
370    /// The entry point to get the [`LatestEvent`] of rooms and threads.
371    ///
372    /// [`LatestEvent`]: crate::latest_event::LatestEvent
373    latest_events: OnceCell<LatestEvents>,
374
375    /// Service handling the catching up of thread subscriptions in the
376    /// background.
377    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
378
379    #[cfg(feature = "experimental-search")]
380    /// Handler for [`RoomIndex`]'s of each room
381    search_index: SearchIndex,
382}
383
384impl ClientInner {
385    /// Create a new `ClientInner`.
386    ///
387    /// All the fields passed as parameters here are those that must be cloned
388    /// upon instantiation of a sub-client, e.g. a client specialized for
389    /// notifications.
390    #[allow(clippy::too_many_arguments)]
391    async fn new(
392        auth_ctx: Arc<AuthCtx>,
393        server: Option<Url>,
394        homeserver: Url,
395        sliding_sync_version: SlidingSyncVersion,
396        http_client: HttpClient,
397        base_client: BaseClient,
398        server_info: ClientServerInfo,
399        respect_login_well_known: bool,
400        event_cache: OnceCell<EventCache>,
401        send_queue: Arc<SendQueueData>,
402        latest_events: OnceCell<LatestEvents>,
403        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
404        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
405        cross_process_store_locks_holder_name: String,
406        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
407        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
408    ) -> Arc<Self> {
409        let caches = ClientCaches {
410            server_info: server_info.into(),
411            server_metadata: Mutex::new(TtlCache::new()),
412        };
413
414        let client = Self {
415            server,
416            homeserver: StdRwLock::new(homeserver),
417            auth_ctx,
418            sliding_sync_version: StdRwLock::new(sliding_sync_version),
419            http_client,
420            base_client,
421            caches,
422            locks: Default::default(),
423            cross_process_store_locks_holder_name,
424            typing_notice_times: Default::default(),
425            event_handlers: Default::default(),
426            notification_handlers: Default::default(),
427            room_update_channels: Default::default(),
428            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
429            // ballast for all observers to catch up.
430            room_updates_sender: broadcast::Sender::new(32),
431            respect_login_well_known,
432            sync_beat: event_listener::Event::new(),
433            event_cache,
434            send_queue_data: send_queue,
435            latest_events,
436            #[cfg(feature = "e2e-encryption")]
437            e2ee: EncryptionData::new(encryption_settings),
438            #[cfg(feature = "e2e-encryption")]
439            verification_state: SharedObservable::new(VerificationState::Unknown),
440            #[cfg(feature = "e2e-encryption")]
441            enable_share_history_on_invite,
442            server_max_upload_size: Mutex::new(OnceCell::new()),
443            #[cfg(feature = "experimental-search")]
444            search_index: search_index_handler,
445            thread_subscription_catchup,
446        };
447
448        #[allow(clippy::let_and_return)]
449        let client = Arc::new(client);
450
451        #[cfg(feature = "e2e-encryption")]
452        client.e2ee.initialize_tasks(&client);
453
454        let _ = client
455            .event_cache
456            .get_or_init(|| async {
457                EventCache::new(
458                    WeakClient::from_inner(&client),
459                    client.base_client.event_cache_store().clone(),
460                )
461            })
462            .await;
463
464        let _ = client
465            .thread_subscription_catchup
466            .get_or_init(|| async {
467                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
468            })
469            .await;
470
471        client
472    }
473}
474
475#[cfg(not(tarpaulin_include))]
476impl Debug for Client {
477    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
478        write!(fmt, "Client")
479    }
480}
481
482impl Client {
483    /// Create a new [`Client`] that will use the given homeserver.
484    ///
485    /// # Arguments
486    ///
487    /// * `homeserver_url` - The homeserver that the client should connect to.
488    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
489        Self::builder().homeserver_url(homeserver_url).build().await
490    }
491
492    /// Returns a subscriber that publishes an event every time the ignore user
493    /// list changes.
494    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
495        self.inner.base_client.subscribe_to_ignore_user_list_changes()
496    }
497
498    /// Create a new [`ClientBuilder`].
499    pub fn builder() -> ClientBuilder {
500        ClientBuilder::new()
501    }
502
503    pub(crate) fn base_client(&self) -> &BaseClient {
504        &self.inner.base_client
505    }
506
507    /// The underlying HTTP client.
508    pub fn http_client(&self) -> &reqwest::Client {
509        &self.inner.http_client.inner
510    }
511
512    pub(crate) fn locks(&self) -> &ClientLocks {
513        &self.inner.locks
514    }
515
516    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
517        &self.inner.auth_ctx
518    }
519
520    /// The cross-process store locks holder name.
521    ///
522    /// The SDK provides cross-process store locks (see
523    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
524    /// `holder_name` is the value used for all cross-process store locks
525    /// used by this `Client`.
526    pub fn cross_process_store_locks_holder_name(&self) -> &str {
527        &self.inner.cross_process_store_locks_holder_name
528    }
529
530    /// Change the homeserver URL used by this client.
531    ///
532    /// # Arguments
533    ///
534    /// * `homeserver_url` - The new URL to use.
535    fn set_homeserver(&self, homeserver_url: Url) {
536        *self.inner.homeserver.write().unwrap() = homeserver_url;
537    }
538
539    /// Change to a different homeserver and re-resolve well-known.
540    #[cfg(feature = "e2e-encryption")]
541    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
542        &self,
543        homeserver_url: Url,
544    ) -> Result<()> {
545        self.set_homeserver(homeserver_url);
546        self.reset_server_info().await?;
547        if let ServerInfo { well_known: Some(well_known), .. } =
548            self.load_or_fetch_server_info().await?
549        {
550            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
551        }
552        Ok(())
553    }
554
555    /// Get the capabilities of the homeserver.
556    ///
557    /// This method should be used to check what features are supported by the
558    /// homeserver.
559    ///
560    /// # Examples
561    ///
562    /// ```no_run
563    /// # use matrix_sdk::Client;
564    /// # use url::Url;
565    /// # async {
566    /// # let homeserver = Url::parse("http://example.com")?;
567    /// let client = Client::new(homeserver).await?;
568    ///
569    /// let capabilities = client.get_capabilities().await?;
570    ///
571    /// if capabilities.change_password.enabled {
572    ///     // Change password
573    /// }
574    /// # anyhow::Ok(()) };
575    /// ```
576    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
577        let res = self.send(get_capabilities::v3::Request::new()).await?;
578        Ok(res.capabilities)
579    }
580
581    /// Get the server vendor information from the federation API.
582    ///
583    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
584    /// both the server's software name and version.
585    ///
586    /// # Examples
587    ///
588    /// ```no_run
589    /// # use matrix_sdk::Client;
590    /// # use url::Url;
591    /// # async {
592    /// # let homeserver = Url::parse("http://example.com")?;
593    /// let client = Client::new(homeserver).await?;
594    ///
595    /// let server_info = client.server_vendor_info(None).await?;
596    /// println!(
597    ///     "Server: {}, Version: {}",
598    ///     server_info.server_name, server_info.version
599    /// );
600    /// # anyhow::Ok(()) };
601    /// ```
602    pub async fn server_vendor_info(
603        &self,
604        request_config: Option<RequestConfig>,
605    ) -> HttpResult<ServerVendorInfo> {
606        let res = self
607            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
608            .await?;
609
610        // Extract server info, using defaults if fields are missing.
611        let server = res.server.unwrap_or_default();
612        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
613        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
614
615        Ok(ServerVendorInfo { server_name: server_name_str, version })
616    }
617
618    /// Get a copy of the default request config.
619    ///
620    /// The default request config is what's used when sending requests if no
621    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
622    /// function with such a parameter.
623    ///
624    /// If the default request config was not customized through
625    /// [`ClientBuilder`] when creating this `Client`, the returned value will
626    /// be equivalent to [`RequestConfig::default()`].
627    pub fn request_config(&self) -> RequestConfig {
628        self.inner.http_client.request_config
629    }
630
631    /// Check whether the client has been activated.
632    ///
633    /// A client is considered active when:
634    ///
635    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
636    ///    is logged in,
637    /// 2. Has loaded cached data from storage,
638    /// 3. If encryption is enabled, it also initialized or restored its
639    ///    `OlmMachine`.
640    pub fn is_active(&self) -> bool {
641        self.inner.base_client.is_active()
642    }
643
644    /// The server used by the client.
645    ///
646    /// See `Self::server` to learn more.
647    pub fn server(&self) -> Option<&Url> {
648        self.inner.server.as_ref()
649    }
650
651    /// The homeserver of the client.
652    pub fn homeserver(&self) -> Url {
653        self.inner.homeserver.read().unwrap().clone()
654    }
655
656    /// Get the sliding sync version.
657    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
658        self.inner.sliding_sync_version.read().unwrap().clone()
659    }
660
661    /// Override the sliding sync version.
662    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
663        let mut lock = self.inner.sliding_sync_version.write().unwrap();
664        *lock = version;
665    }
666
667    /// Get the Matrix user session meta information.
668    ///
669    /// If the client is currently logged in, this will return a
670    /// [`SessionMeta`] object which contains the user ID and device ID.
671    /// Otherwise it returns `None`.
672    pub fn session_meta(&self) -> Option<&SessionMeta> {
673        self.base_client().session_meta()
674    }
675
676    /// Returns a receiver that gets events for each room info update. To watch
677    /// for new events, use `receiver.resubscribe()`.
678    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
679        self.base_client().room_info_notable_update_receiver()
680    }
681
682    /// Performs a search for users.
683    /// The search is performed case-insensitively on user IDs and display names
684    ///
685    /// # Arguments
686    ///
687    /// * `search_term` - The search term for the search
688    /// * `limit` - The maximum number of results to return. Defaults to 10.
689    ///
690    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
691    pub async fn search_users(
692        &self,
693        search_term: &str,
694        limit: u64,
695    ) -> HttpResult<search_users::v3::Response> {
696        let mut request = search_users::v3::Request::new(search_term.to_owned());
697
698        if let Some(limit) = UInt::new(limit) {
699            request.limit = limit;
700        }
701
702        self.send(request).await
703    }
704
705    /// Get the user id of the current owner of the client.
706    pub fn user_id(&self) -> Option<&UserId> {
707        self.session_meta().map(|s| s.user_id.as_ref())
708    }
709
710    /// Get the device ID that identifies the current session.
711    pub fn device_id(&self) -> Option<&DeviceId> {
712        self.session_meta().map(|s| s.device_id.as_ref())
713    }
714
715    /// Get the current access token for this session.
716    ///
717    /// Will be `None` if the client has not been logged in.
718    pub fn access_token(&self) -> Option<String> {
719        self.auth_ctx().access_token()
720    }
721
722    /// Get the current tokens for this session.
723    ///
724    /// To be notified of changes in the session tokens, use
725    /// [`Client::subscribe_to_session_changes()`] or
726    /// [`Client::set_session_callbacks()`].
727    ///
728    /// Returns `None` if the client has not been logged in.
729    pub fn session_tokens(&self) -> Option<SessionTokens> {
730        self.auth_ctx().session_tokens()
731    }
732
733    /// Access the authentication API used to log in this client.
734    ///
735    /// Will be `None` if the client has not been logged in.
736    pub fn auth_api(&self) -> Option<AuthApi> {
737        match self.auth_ctx().auth_data.get()? {
738            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
739            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
740        }
741    }
742
743    /// Get the whole session info of this client.
744    ///
745    /// Will be `None` if the client has not been logged in.
746    ///
747    /// Can be used with [`Client::restore_session`] to restore a previously
748    /// logged-in session.
749    pub fn session(&self) -> Option<AuthSession> {
750        match self.auth_api()? {
751            AuthApi::Matrix(api) => api.session().map(Into::into),
752            AuthApi::OAuth(api) => api.full_session().map(Into::into),
753        }
754    }
755
756    /// Get a reference to the state store.
757    pub fn state_store(&self) -> &DynStateStore {
758        self.base_client().state_store()
759    }
760
761    /// Get a reference to the event cache store.
762    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
763        self.base_client().event_cache_store()
764    }
765
766    /// Get a reference to the media store.
767    pub fn media_store(&self) -> &MediaStoreLock {
768        self.base_client().media_store()
769    }
770
771    /// Access the native Matrix authentication API with this client.
772    pub fn matrix_auth(&self) -> MatrixAuth {
773        MatrixAuth::new(self.clone())
774    }
775
776    /// Get the account of the current owner of the client.
777    pub fn account(&self) -> Account {
778        Account::new(self.clone())
779    }
780
781    /// Get the encryption manager of the client.
782    #[cfg(feature = "e2e-encryption")]
783    pub fn encryption(&self) -> Encryption {
784        Encryption::new(self.clone())
785    }
786
787    /// Get the media manager of the client.
788    pub fn media(&self) -> Media {
789        Media::new(self.clone())
790    }
791
792    /// Get the pusher manager of the client.
793    pub fn pusher(&self) -> Pusher {
794        Pusher::new(self.clone())
795    }
796
797    /// Access the OAuth 2.0 API of the client.
798    pub fn oauth(&self) -> OAuth {
799        OAuth::new(self.clone())
800    }
801
802    /// Register a handler for a specific event type.
803    ///
804    /// The handler is a function or closure with one or more arguments. The
805    /// first argument is the event itself. All additional arguments are
806    /// "context" arguments: They have to implement [`EventHandlerContext`].
807    /// This trait is named that way because most of the types implementing it
808    /// give additional context about an event: The room it was in, its raw form
809    /// and other similar things. As two exceptions to this,
810    /// [`Client`] and [`EventHandlerHandle`] also implement the
811    /// `EventHandlerContext` trait so you don't have to clone your client
812    /// into the event handler manually and a handler can decide to remove
813    /// itself.
814    ///
815    /// Some context arguments are not universally applicable. A context
816    /// argument that isn't available for the given event type will result in
817    /// the event handler being skipped and an error being logged. The following
818    /// context argument types are only available for a subset of event types:
819    ///
820    /// * [`Room`] is only available for room-specific events, i.e. not for
821    ///   events like global account data events or presence events.
822    ///
823    /// You can provide custom context via
824    /// [`add_event_handler_context`](Client::add_event_handler_context) and
825    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
826    /// into the event handler.
827    ///
828    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
829    ///
830    /// # Examples
831    ///
832    /// ```no_run
833    /// use matrix_sdk::{
834    ///     deserialized_responses::EncryptionInfo,
835    ///     event_handler::Ctx,
836    ///     ruma::{
837    ///         events::{
838    ///             macros::EventContent,
839    ///             push_rules::PushRulesEvent,
840    ///             room::{
841    ///                 message::SyncRoomMessageEvent,
842    ///                 topic::SyncRoomTopicEvent,
843    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
844    ///             },
845    ///         },
846    ///         push::Action,
847    ///         Int, MilliSecondsSinceUnixEpoch,
848    ///     },
849    ///     Client, Room,
850    /// };
851    /// use serde::{Deserialize, Serialize};
852    ///
853    /// # async fn example(client: Client) {
854    /// client.add_event_handler(
855    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
856    ///         // Common usage: Room event plus room and client.
857    ///     },
858    /// );
859    /// client.add_event_handler(
860    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
861    ///         async move {
862    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
863    ///             // unencrypted events and events that were decrypted by the SDK.
864    ///         }
865    ///     },
866    /// );
867    /// client.add_event_handler(
868    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
869    ///         async move {
870    ///             // A `Vec<Action>` parameter allows you to know which push actions
871    ///             // are applicable for an event. For example, an event with
872    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
873    ///             // in the timeline.
874    ///         }
875    ///     },
876    /// );
877    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
878    ///     // You can omit any or all arguments after the first.
879    /// });
880    ///
881    /// // Registering a temporary event handler:
882    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
883    ///     /* Event handler */
884    /// });
885    /// client.remove_event_handler(handle);
886    ///
887    /// // Registering custom event handler context:
888    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
889    /// struct MyContext {
890    ///     number: usize,
891    /// }
892    /// client.add_event_handler_context(MyContext { number: 5 });
893    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
894    ///     // Use the context
895    /// });
896    ///
897    /// // This will handle membership events in joined rooms. Invites are special, see below.
898    /// client.add_event_handler(
899    ///     |ev: SyncRoomMemberEvent| async move {},
900    /// );
901    ///
902    /// // To handle state events in invited rooms (including invite membership events),
903    /// // `StrippedRoomMemberEvent` should be used.
904    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
905    /// client.add_event_handler(
906    ///     |ev: StrippedRoomMemberEvent| async move {},
907    /// );
908    ///
909    /// // Custom events work exactly the same way, you just need to declare
910    /// // the content struct and use the EventContent derive macro on it.
911    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
912    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
913    /// struct TokenEventContent {
914    ///     token: String,
915    ///     #[serde(rename = "exp")]
916    ///     expires_at: MilliSecondsSinceUnixEpoch,
917    /// }
918    ///
919    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
920    ///     todo!("Display the token");
921    /// });
922    ///
923    /// // Event handler closures can also capture local variables.
924    /// // Make sure they are cheap to clone though, because they will be cloned
925    /// // every time the closure is called.
926    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
927    ///
928    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
929    ///     println!("Calling the handler with identifier {data}");
930    /// });
931    /// # }
932    /// ```
933    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
934    where
935        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
936        H: EventHandler<Ev, Ctx>,
937    {
938        self.add_event_handler_impl(handler, None)
939    }
940
941    /// Register a handler for a specific room, and event type.
942    ///
943    /// This method works the same way as
944    /// [`add_event_handler`][Self::add_event_handler], except that the handler
945    /// will only be called for events in the room with the specified ID. See
946    /// that method for more details on event handler functions.
947    ///
948    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
949    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
950    /// your use case.
951    pub fn add_room_event_handler<Ev, Ctx, H>(
952        &self,
953        room_id: &RoomId,
954        handler: H,
955    ) -> EventHandlerHandle
956    where
957        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
958        H: EventHandler<Ev, Ctx>,
959    {
960        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
961    }
962
963    /// Observe a specific event type.
964    ///
965    /// `Ev` represents the kind of event that will be observed. `Ctx`
966    /// represents the context that will come with the event. It relies on the
967    /// same mechanism as [`Client::add_event_handler`]. The main difference is
968    /// that it returns an [`ObservableEventHandler`] and doesn't require a
969    /// user-defined closure. It is possible to subscribe to the
970    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
971    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
972    /// Ctx)`.
973    ///
974    /// Be careful that only the most recent value can be observed. Subscribers
975    /// are notified when a new value is sent, but there is no guarantee
976    /// that they will see all values.
977    ///
978    /// # Example
979    ///
980    /// Let's see a classical usage:
981    ///
982    /// ```
983    /// use futures_util::StreamExt as _;
984    /// use matrix_sdk::{
985    ///     Client, Room,
986    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
987    /// };
988    ///
989    /// # async fn example(client: Client) -> Option<()> {
990    /// let observer =
991    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
992    ///
993    /// let mut subscriber = observer.subscribe();
994    ///
995    /// let (event, (room, push_actions)) = subscriber.next().await?;
996    /// # Some(())
997    /// # }
998    /// ```
999    ///
1000    /// Now let's see how to get several contexts that can be useful for you:
1001    ///
1002    /// ```
1003    /// use matrix_sdk::{
1004    ///     Client, Room,
1005    ///     deserialized_responses::EncryptionInfo,
1006    ///     ruma::{
1007    ///         events::room::{
1008    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1009    ///         },
1010    ///         push::Action,
1011    ///     },
1012    /// };
1013    ///
1014    /// # async fn example(client: Client) {
1015    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1016    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1017    ///
1018    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1019    /// // to distinguish between unencrypted events and events that were decrypted
1020    /// // by the SDK.
1021    /// let _ = client
1022    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1023    ///     );
1024    ///
1025    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1026    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1027    /// // should be highlighted in the timeline.
1028    /// let _ =
1029    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1030    ///
1031    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1032    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1033    /// # }
1034    /// ```
1035    ///
1036    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1037    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1038    where
1039        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1040        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1041    {
1042        self.observe_room_events_impl(None)
1043    }
1044
1045    /// Observe a specific room, and event type.
1046    ///
1047    /// This method works the same way as [`Client::observe_events`], except
1048    /// that the observability will only be applied for events in the room with
1049    /// the specified ID. See that method for more details.
1050    ///
1051    /// Be careful that only the most recent value can be observed. Subscribers
1052    /// are notified when a new value is sent, but there is no guarantee
1053    /// that they will see all values.
1054    pub fn observe_room_events<Ev, Ctx>(
1055        &self,
1056        room_id: &RoomId,
1057    ) -> ObservableEventHandler<(Ev, Ctx)>
1058    where
1059        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1060        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1061    {
1062        self.observe_room_events_impl(Some(room_id.to_owned()))
1063    }
1064
1065    /// Shared implementation for `Client::observe_events` and
1066    /// `Client::observe_room_events`.
1067    fn observe_room_events_impl<Ev, Ctx>(
1068        &self,
1069        room_id: Option<OwnedRoomId>,
1070    ) -> ObservableEventHandler<(Ev, Ctx)>
1071    where
1072        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1073        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1074    {
1075        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1076        // new value.
1077        let shared_observable = SharedObservable::new(None);
1078
1079        ObservableEventHandler::new(
1080            shared_observable.clone(),
1081            self.event_handler_drop_guard(self.add_event_handler_impl(
1082                move |event: Ev, context: Ctx| {
1083                    shared_observable.set(Some((event, context)));
1084
1085                    ready(())
1086                },
1087                room_id,
1088            )),
1089        )
1090    }
1091
1092    /// Remove the event handler associated with the handle.
1093    ///
1094    /// Note that you **must not** call `remove_event_handler` from the
1095    /// non-async part of an event handler, that is:
1096    ///
1097    /// ```ignore
1098    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1099    ///     // ⚠ this will cause a deadlock ⚠
1100    ///     client.remove_event_handler(handle);
1101    ///
1102    ///     async move {
1103    ///         // removing the event handler here is fine
1104    ///         client.remove_event_handler(handle);
1105    ///     }
1106    /// })
1107    /// ```
1108    ///
1109    /// Note also that handlers that remove themselves will still execute with
1110    /// events received in the same sync cycle.
1111    ///
1112    /// # Arguments
1113    ///
1114    /// `handle` - The [`EventHandlerHandle`] that is returned when
1115    /// registering the event handler with [`Client::add_event_handler`].
1116    ///
1117    /// # Examples
1118    ///
1119    /// ```no_run
1120    /// # use url::Url;
1121    /// # use tokio::sync::mpsc;
1122    /// #
1123    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1124    /// #
1125    /// use matrix_sdk::{
1126    ///     Client, event_handler::EventHandlerHandle,
1127    ///     ruma::events::room::member::SyncRoomMemberEvent,
1128    /// };
1129    /// #
1130    /// # futures_executor::block_on(async {
1131    /// # let client = matrix_sdk::Client::builder()
1132    /// #     .homeserver_url(homeserver)
1133    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1134    /// #     .build()
1135    /// #     .await
1136    /// #     .unwrap();
1137    ///
1138    /// client.add_event_handler(
1139    ///     |ev: SyncRoomMemberEvent,
1140    ///      client: Client,
1141    ///      handle: EventHandlerHandle| async move {
1142    ///         // Common usage: Check arriving Event is the expected one
1143    ///         println!("Expected RoomMemberEvent received!");
1144    ///         client.remove_event_handler(handle);
1145    ///     },
1146    /// );
1147    /// # });
1148    /// ```
1149    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1150        self.inner.event_handlers.remove(handle);
1151    }
1152
1153    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1154    /// the given handle.
1155    ///
1156    /// When the returned value is dropped, the event handler will be removed.
1157    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1158        EventHandlerDropGuard::new(handle, self.clone())
1159    }
1160
1161    /// Add an arbitrary value for use as event handler context.
1162    ///
1163    /// The value can be obtained in an event handler by adding an argument of
1164    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1165    ///
1166    /// If a value of the same type has been added before, it will be
1167    /// overwritten.
1168    ///
1169    /// # Examples
1170    ///
1171    /// ```no_run
1172    /// use matrix_sdk::{
1173    ///     Room, event_handler::Ctx,
1174    ///     ruma::events::room::message::SyncRoomMessageEvent,
1175    /// };
1176    /// # #[derive(Clone)]
1177    /// # struct SomeType;
1178    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1179    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1180    /// # futures_executor::block_on(async {
1181    /// # let client = matrix_sdk::Client::builder()
1182    /// #     .homeserver_url(homeserver)
1183    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1184    /// #     .build()
1185    /// #     .await
1186    /// #     .unwrap();
1187    ///
1188    /// // Handle used to send messages to the UI part of the app
1189    /// let my_gui_handle: SomeType = obtain_gui_handle();
1190    ///
1191    /// client.add_event_handler_context(my_gui_handle.clone());
1192    /// client.add_event_handler(
1193    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1194    ///         async move {
1195    ///             // gui_handle.send(DisplayMessage { message: ev });
1196    ///         }
1197    ///     },
1198    /// );
1199    /// # });
1200    /// ```
1201    pub fn add_event_handler_context<T>(&self, ctx: T)
1202    where
1203        T: Clone + Send + Sync + 'static,
1204    {
1205        self.inner.event_handlers.add_context(ctx);
1206    }
1207
1208    /// Register a handler for a notification.
1209    ///
1210    /// Similar to [`Client::add_event_handler`], but only allows functions
1211    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1212    /// [`Client`] for now.
1213    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1214    where
1215        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1216        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1217    {
1218        self.inner.notification_handlers.write().await.push(Box::new(
1219            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1220        ));
1221
1222        self
1223    }
1224
1225    /// Subscribe to all updates for the room with the given ID.
1226    ///
1227    /// The returned receiver will receive a new message for each sync response
1228    /// that contains updates for that room.
1229    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1230        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1231            btree_map::Entry::Vacant(entry) => {
1232                let (tx, rx) = broadcast::channel(8);
1233                entry.insert(tx);
1234                rx
1235            }
1236            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1237        }
1238    }
1239
1240    /// Subscribe to all updates to all rooms, whenever any has been received in
1241    /// a sync response.
1242    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1243        self.inner.room_updates_sender.subscribe()
1244    }
1245
1246    pub(crate) async fn notification_handlers(
1247        &self,
1248    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1249        self.inner.notification_handlers.read().await
1250    }
1251
1252    /// Get all the rooms the client knows about.
1253    ///
1254    /// This will return the list of joined, invited, and left rooms.
1255    pub fn rooms(&self) -> Vec<Room> {
1256        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1257    }
1258
1259    /// Get all the rooms the client knows about, filtered by room state.
1260    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1261        self.base_client()
1262            .rooms_filtered(filter)
1263            .into_iter()
1264            .map(|room| Room::new(self.clone(), room))
1265            .collect()
1266    }
1267
1268    /// Get a stream of all the rooms, in addition to the existing rooms.
1269    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1270        let (rooms, stream) = self.base_client().rooms_stream();
1271
1272        let map_room = |room| Room::new(self.clone(), room);
1273
1274        (
1275            rooms.into_iter().map(map_room).collect(),
1276            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1277        )
1278    }
1279
1280    /// Returns the joined rooms this client knows about.
1281    pub fn joined_rooms(&self) -> Vec<Room> {
1282        self.rooms_filtered(RoomStateFilter::JOINED)
1283    }
1284
1285    /// Returns the invited rooms this client knows about.
1286    pub fn invited_rooms(&self) -> Vec<Room> {
1287        self.rooms_filtered(RoomStateFilter::INVITED)
1288    }
1289
1290    /// Returns the left rooms this client knows about.
1291    pub fn left_rooms(&self) -> Vec<Room> {
1292        self.rooms_filtered(RoomStateFilter::LEFT)
1293    }
1294
1295    /// Returns the joined space rooms this client knows about.
1296    pub fn joined_space_rooms(&self) -> Vec<Room> {
1297        self.base_client()
1298            .rooms_filtered(RoomStateFilter::JOINED)
1299            .into_iter()
1300            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1301            .collect()
1302    }
1303
1304    /// Get a room with the given room id.
1305    ///
1306    /// # Arguments
1307    ///
1308    /// `room_id` - The unique id of the room that should be fetched.
1309    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1310        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1311    }
1312
1313    /// Gets the preview of a room, whether the current user has joined it or
1314    /// not.
1315    pub async fn get_room_preview(
1316        &self,
1317        room_or_alias_id: &RoomOrAliasId,
1318        via: Vec<OwnedServerName>,
1319    ) -> Result<RoomPreview> {
1320        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1321            Ok(room_id) => room_id.to_owned(),
1322            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1323        };
1324
1325        if let Some(room) = self.get_room(&room_id) {
1326            // The cached data can only be trusted if the room state is joined or
1327            // banned: for invite and knock rooms, no updates will be received
1328            // for the rooms after the invite/knock action took place so we may
1329            // have very out to date data for important fields such as
1330            // `join_rule`. For left rooms, the homeserver should return the latest info.
1331            match room.state() {
1332                RoomState::Joined | RoomState::Banned => {
1333                    return Ok(RoomPreview::from_known_room(&room).await);
1334                }
1335                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1336            }
1337        }
1338
1339        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1340    }
1341
1342    /// Resolve a room alias to a room id and a list of servers which know
1343    /// about it.
1344    ///
1345    /// # Arguments
1346    ///
1347    /// `room_alias` - The room alias to be resolved.
1348    pub async fn resolve_room_alias(
1349        &self,
1350        room_alias: &RoomAliasId,
1351    ) -> HttpResult<get_alias::v3::Response> {
1352        let request = get_alias::v3::Request::new(room_alias.to_owned());
1353        self.send(request).await
1354    }
1355
1356    /// Checks if a room alias is not in use yet.
1357    ///
1358    /// Returns:
1359    /// - `Ok(true)` if the room alias is available.
1360    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1361    ///   status code).
1362    /// - An `Err` otherwise.
1363    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1364        match self.resolve_room_alias(alias).await {
1365            // The room alias was resolved, so it's already in use.
1366            Ok(_) => Ok(false),
1367            Err(error) => {
1368                match error.client_api_error_kind() {
1369                    // The room alias wasn't found, so it's available.
1370                    Some(ErrorKind::NotFound) => Ok(true),
1371                    _ => Err(error),
1372                }
1373            }
1374        }
1375    }
1376
1377    /// Adds a new room alias associated with a room to the room directory.
1378    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1379        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1380        self.send(request).await?;
1381        Ok(())
1382    }
1383
1384    /// Removes a room alias from the room directory.
1385    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1386        let request = delete_alias::v3::Request::new(alias.to_owned());
1387        self.send(request).await?;
1388        Ok(())
1389    }
1390
1391    /// Update the homeserver from the login response well-known if needed.
1392    ///
1393    /// # Arguments
1394    ///
1395    /// * `login_well_known` - The `well_known` field from a successful login
1396    ///   response.
1397    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1398        if self.inner.respect_login_well_known
1399            && let Some(well_known) = login_well_known
1400            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1401        {
1402            self.set_homeserver(homeserver);
1403        }
1404    }
1405
1406    /// Similar to [`Client::restore_session_with`], with
1407    /// [`RoomLoadSettings::default()`].
1408    ///
1409    /// # Panics
1410    ///
1411    /// Panics if a session was already restored or logged in.
1412    #[instrument(skip_all)]
1413    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1414        self.restore_session_with(session, RoomLoadSettings::default()).await
1415    }
1416
1417    /// Restore a session previously logged-in using one of the available
1418    /// authentication APIs. The number of rooms to restore is controlled by
1419    /// [`RoomLoadSettings`].
1420    ///
1421    /// See the documentation of the corresponding authentication API's
1422    /// `restore_session` method for more information.
1423    ///
1424    /// # Panics
1425    ///
1426    /// Panics if a session was already restored or logged in.
1427    #[instrument(skip_all)]
1428    pub async fn restore_session_with(
1429        &self,
1430        session: impl Into<AuthSession>,
1431        room_load_settings: RoomLoadSettings,
1432    ) -> Result<()> {
1433        let session = session.into();
1434        match session {
1435            AuthSession::Matrix(session) => {
1436                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1437            }
1438            AuthSession::OAuth(session) => {
1439                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1440            }
1441        }
1442    }
1443
1444    /// Refresh the access token using the authentication API used to log into
1445    /// this session.
1446    ///
1447    /// See the documentation of the authentication API's `refresh_access_token`
1448    /// method for more information.
1449    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1450        let Some(auth_api) = self.auth_api() else {
1451            return Err(RefreshTokenError::RefreshTokenRequired);
1452        };
1453
1454        match auth_api {
1455            AuthApi::Matrix(api) => {
1456                trace!("Token refresh: Using the homeserver.");
1457                Box::pin(api.refresh_access_token()).await?;
1458            }
1459            AuthApi::OAuth(api) => {
1460                trace!("Token refresh: Using OAuth 2.0.");
1461                Box::pin(api.refresh_access_token()).await?;
1462            }
1463        }
1464
1465        Ok(())
1466    }
1467
1468    /// Log out the current session using the proper authentication API.
1469    ///
1470    /// # Errors
1471    ///
1472    /// Returns an error if the session is not authenticated or if an error
1473    /// occurred while making the request to the server.
1474    pub async fn logout(&self) -> Result<(), Error> {
1475        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1476        match auth_api {
1477            AuthApi::Matrix(matrix_auth) => {
1478                matrix_auth.logout().await?;
1479                Ok(())
1480            }
1481            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1482        }
1483    }
1484
1485    /// Get or upload a sync filter.
1486    ///
1487    /// This method will either get a filter ID from the store or upload the
1488    /// filter definition to the homeserver and return the new filter ID.
1489    ///
1490    /// # Arguments
1491    ///
1492    /// * `filter_name` - The unique name of the filter, this name will be used
1493    /// locally to store and identify the filter ID returned by the server.
1494    ///
1495    /// * `definition` - The filter definition that should be uploaded to the
1496    /// server if no filter ID can be found in the store.
1497    ///
1498    /// # Examples
1499    ///
1500    /// ```no_run
1501    /// # use matrix_sdk::{
1502    /// #    Client, config::SyncSettings,
1503    /// #    ruma::api::client::{
1504    /// #        filter::{
1505    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1506    /// #        },
1507    /// #        sync::sync_events::v3::Filter,
1508    /// #    }
1509    /// # };
1510    /// # use url::Url;
1511    /// # async {
1512    /// # let homeserver = Url::parse("http://example.com").unwrap();
1513    /// # let client = Client::new(homeserver).await.unwrap();
1514    /// let mut filter = FilterDefinition::default();
1515    ///
1516    /// // Let's enable member lazy loading.
1517    /// filter.room.state.lazy_load_options =
1518    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1519    ///
1520    /// let filter_id = client
1521    ///     .get_or_upload_filter("sync", filter)
1522    ///     .await
1523    ///     .unwrap();
1524    ///
1525    /// let sync_settings = SyncSettings::new()
1526    ///     .filter(Filter::FilterId(filter_id));
1527    ///
1528    /// let response = client.sync_once(sync_settings).await.unwrap();
1529    /// # };
1530    #[instrument(skip(self, definition))]
1531    pub async fn get_or_upload_filter(
1532        &self,
1533        filter_name: &str,
1534        definition: FilterDefinition,
1535    ) -> Result<String> {
1536        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1537            debug!("Found filter locally");
1538            Ok(filter)
1539        } else {
1540            debug!("Didn't find filter locally");
1541            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1542            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1543            let response = self.send(request).await?;
1544
1545            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1546
1547            Ok(response.filter_id)
1548        }
1549    }
1550
1551    /// Prepare to join a room by ID, by getting the current details about it
1552    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1553        let room = self.get_room(room_id)?;
1554
1555        let inviter = match room.invite_details().await {
1556            Ok(details) => details.inviter,
1557            Err(Error::WrongRoomState(_)) => None,
1558            Err(e) => {
1559                warn!("Error fetching invite details for room: {e:?}");
1560                None
1561            }
1562        };
1563
1564        Some(PreJoinRoomInfo { inviter })
1565    }
1566
1567    /// Finish joining a room.
1568    ///
1569    /// If the room was an invite that should be marked as a DM, will include it
1570    /// in the DM event after creating the joined room.
1571    ///
1572    /// If encrypted history sharing is enabled, will check to see if we have a
1573    /// key bundle, and import it if so.
1574    ///
1575    /// # Arguments
1576    ///
1577    /// * `room_id` - The `RoomId` of the room that was joined.
1578    /// * `pre_join_room_info` - Information about the room before we joined.
1579    async fn finish_join_room(
1580        &self,
1581        room_id: &RoomId,
1582        pre_join_room_info: Option<PreJoinRoomInfo>,
1583    ) -> Result<Room> {
1584        info!(?room_id, ?pre_join_room_info, "Completing room join");
1585        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1586            room.state() == RoomState::Invited
1587                && room.is_direct().await.unwrap_or_else(|e| {
1588                    warn!(%room_id, "is_direct() failed: {e}");
1589                    false
1590                })
1591        } else {
1592            false
1593        };
1594
1595        let base_room = self
1596            .base_client()
1597            .room_joined(
1598                room_id,
1599                pre_join_room_info
1600                    .as_ref()
1601                    .and_then(|info| info.inviter.as_ref())
1602                    .map(|i| i.user_id().to_owned()),
1603            )
1604            .await?;
1605        let room = Room::new(self.clone(), base_room);
1606
1607        if mark_as_dm {
1608            room.set_is_direct(true).await?;
1609        }
1610
1611        #[cfg(feature = "e2e-encryption")]
1612        if self.inner.enable_share_history_on_invite
1613            && let Some(inviter) =
1614                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1615        {
1616            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1617                .await?;
1618        }
1619
1620        // Suppress "unused variable" and "unused field" lints
1621        #[cfg(not(feature = "e2e-encryption"))]
1622        let _ = pre_join_room_info.map(|i| i.inviter);
1623
1624        Ok(room)
1625    }
1626
1627    /// Join a room by `RoomId`.
1628    ///
1629    /// Returns the `Room` in the joined state.
1630    ///
1631    /// # Arguments
1632    ///
1633    /// * `room_id` - The `RoomId` of the room to be joined.
1634    #[instrument(skip(self))]
1635    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1636        // See who invited us to this room, if anyone. Note we have to do this before
1637        // making the `/join` request, otherwise we could race against the sync.
1638        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1639
1640        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1641        let response = self.send(request).await?;
1642        self.finish_join_room(&response.room_id, pre_join_info).await
1643    }
1644
1645    /// Join a room by `RoomOrAliasId`.
1646    ///
1647    /// Returns the `Room` in the joined state.
1648    ///
1649    /// # Arguments
1650    ///
1651    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1652    ///   alias looks like `#name:example.com`.
1653    /// * `server_names` - The server names to be used for resolving the alias,
1654    ///   if needs be.
1655    #[instrument(skip(self))]
1656    pub async fn join_room_by_id_or_alias(
1657        &self,
1658        alias: &RoomOrAliasId,
1659        server_names: &[OwnedServerName],
1660    ) -> Result<Room> {
1661        let pre_join_info = {
1662            match alias.try_into() {
1663                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1664                Err(_) => {
1665                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1666                    // responding to an invitation to the room, and therefore don't need to handle
1667                    // things that happen as a result of invites.
1668                    None
1669                }
1670            }
1671        };
1672        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1673            via: server_names.to_owned(),
1674        });
1675        let response = self.send(request).await?;
1676        self.finish_join_room(&response.room_id, pre_join_info).await
1677    }
1678
1679    /// Search the homeserver's directory of public rooms.
1680    ///
1681    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1682    /// a `get_public_rooms::Response`.
1683    ///
1684    /// # Arguments
1685    ///
1686    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1687    ///
1688    /// * `since` - Pagination token from a previous request.
1689    ///
1690    /// * `server` - The name of the server, if `None` the requested server is
1691    ///   used.
1692    ///
1693    /// # Examples
1694    /// ```no_run
1695    /// use matrix_sdk::Client;
1696    /// # use url::Url;
1697    /// # let homeserver = Url::parse("http://example.com").unwrap();
1698    /// # let limit = Some(10);
1699    /// # let since = Some("since token");
1700    /// # let server = Some("servername.com".try_into().unwrap());
1701    /// # async {
1702    /// let mut client = Client::new(homeserver).await.unwrap();
1703    ///
1704    /// client.public_rooms(limit, since, server).await;
1705    /// # };
1706    /// ```
1707    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1708    pub async fn public_rooms(
1709        &self,
1710        limit: Option<u32>,
1711        since: Option<&str>,
1712        server: Option<&ServerName>,
1713    ) -> HttpResult<get_public_rooms::v3::Response> {
1714        let limit = limit.map(UInt::from);
1715
1716        let request = assign!(get_public_rooms::v3::Request::new(), {
1717            limit,
1718            since: since.map(ToOwned::to_owned),
1719            server: server.map(ToOwned::to_owned),
1720        });
1721        self.send(request).await
1722    }
1723
1724    /// Create a room with the given parameters.
1725    ///
1726    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1727    /// created room.
1728    ///
1729    /// If you want to create a direct message with one specific user, you can
1730    /// use [`create_dm`][Self::create_dm], which is more convenient than
1731    /// assembling the [`create_room::v3::Request`] yourself.
1732    ///
1733    /// If the `is_direct` field of the request is set to `true` and at least
1734    /// one user is invited, the room will be automatically added to the direct
1735    /// rooms in the account data.
1736    ///
1737    /// # Examples
1738    ///
1739    /// ```no_run
1740    /// use matrix_sdk::{
1741    ///     Client,
1742    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1743    /// };
1744    /// # use url::Url;
1745    /// #
1746    /// # async {
1747    /// # let homeserver = Url::parse("http://example.com").unwrap();
1748    /// let request = CreateRoomRequest::new();
1749    /// let client = Client::new(homeserver).await.unwrap();
1750    /// assert!(client.create_room(request).await.is_ok());
1751    /// # };
1752    /// ```
1753    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1754        let invite = request.invite.clone();
1755        let is_direct_room = request.is_direct;
1756        let response = self.send(request).await?;
1757
1758        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1759
1760        let joined_room = Room::new(self.clone(), base_room);
1761
1762        if is_direct_room
1763            && !invite.is_empty()
1764            && let Err(error) =
1765                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1766        {
1767            // FIXME: Retry in the background
1768            error!("Failed to mark room as DM: {error}");
1769        }
1770
1771        Ok(joined_room)
1772    }
1773
1774    /// Create a DM room.
1775    ///
1776    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1777    /// given user being invited, the room marked `is_direct` and both the
1778    /// creator and invitee getting the default maximum power level.
1779    ///
1780    /// If the `e2e-encryption` feature is enabled, the room will also be
1781    /// encrypted.
1782    ///
1783    /// # Arguments
1784    ///
1785    /// * `user_id` - The ID of the user to create a DM for.
1786    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1787        #[cfg(feature = "e2e-encryption")]
1788        let initial_state = vec![
1789            InitialStateEvent::with_empty_state_key(
1790                RoomEncryptionEventContent::with_recommended_defaults(),
1791            )
1792            .to_raw_any(),
1793        ];
1794
1795        #[cfg(not(feature = "e2e-encryption"))]
1796        let initial_state = vec![];
1797
1798        let request = assign!(create_room::v3::Request::new(), {
1799            invite: vec![user_id.to_owned()],
1800            is_direct: true,
1801            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1802            initial_state,
1803        });
1804
1805        self.create_room(request).await
1806    }
1807
1808    /// Get the existing DM room with the given user, if any.
1809    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1810        let rooms = self.joined_rooms();
1811
1812        // Find the room we share with the `user_id` and only with `user_id`
1813        let room = rooms.into_iter().find(|r| {
1814            let targets = r.direct_targets();
1815            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1816        });
1817
1818        trace!(?user_id, ?room, "Found DM room with user");
1819        room
1820    }
1821
1822    /// Search the homeserver's directory for public rooms with a filter.
1823    ///
1824    /// # Arguments
1825    ///
1826    /// * `room_search` - The easiest way to create this request is using the
1827    ///   `get_public_rooms_filtered::Request` itself.
1828    ///
1829    /// # Examples
1830    ///
1831    /// ```no_run
1832    /// # use url::Url;
1833    /// # use matrix_sdk::Client;
1834    /// # async {
1835    /// # let homeserver = Url::parse("http://example.com")?;
1836    /// use matrix_sdk::ruma::{
1837    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1838    /// };
1839    /// # let mut client = Client::new(homeserver).await?;
1840    ///
1841    /// let mut filter = Filter::new();
1842    /// filter.generic_search_term = Some("rust".to_owned());
1843    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1844    /// request.filter = filter;
1845    ///
1846    /// let response = client.public_rooms_filtered(request).await?;
1847    ///
1848    /// for room in response.chunk {
1849    ///     println!("Found room {room:?}");
1850    /// }
1851    /// # anyhow::Ok(()) };
1852    /// ```
1853    pub async fn public_rooms_filtered(
1854        &self,
1855        request: get_public_rooms_filtered::v3::Request,
1856    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1857        self.send(request).await
1858    }
1859
1860    /// Send an arbitrary request to the server, without updating client state.
1861    ///
1862    /// **Warning:** Because this method *does not* update the client state, it
1863    /// is important to make sure that you account for this yourself, and
1864    /// use wrapper methods where available.  This method should *only* be
1865    /// used if a wrapper method for the endpoint you'd like to use is not
1866    /// available.
1867    ///
1868    /// # Arguments
1869    ///
1870    /// * `request` - A filled out and valid request for the endpoint to be hit
1871    ///
1872    /// * `timeout` - An optional request timeout setting, this overrides the
1873    ///   default request setting if one was set.
1874    ///
1875    /// # Examples
1876    ///
1877    /// ```no_run
1878    /// # use matrix_sdk::{Client, config::SyncSettings};
1879    /// # use url::Url;
1880    /// # async {
1881    /// # let homeserver = Url::parse("http://localhost:8080")?;
1882    /// # let mut client = Client::new(homeserver).await?;
1883    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1884    ///
1885    /// // First construct the request you want to make
1886    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1887    /// // for all available Endpoints
1888    /// let user_id = user_id!("@example:localhost").to_owned();
1889    /// let request = profile::get_profile::v3::Request::new(user_id);
1890    ///
1891    /// // Start the request using Client::send()
1892    /// let response = client.send(request).await?;
1893    ///
1894    /// // Check the corresponding Response struct to find out what types are
1895    /// // returned
1896    /// # anyhow::Ok(()) };
1897    /// ```
1898    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1899    where
1900        Request: OutgoingRequest + Clone + Debug,
1901        for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1902        Request::PathBuilder: SupportedPathBuilder,
1903        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1904        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1905    {
1906        SendRequest {
1907            client: self.clone(),
1908            request,
1909            config: None,
1910            send_progress: Default::default(),
1911        }
1912    }
1913
1914    pub(crate) async fn send_inner<Request>(
1915        &self,
1916        request: Request,
1917        config: Option<RequestConfig>,
1918        send_progress: SharedObservable<TransmissionProgress>,
1919    ) -> HttpResult<Request::IncomingResponse>
1920    where
1921        Request: OutgoingRequest + Debug,
1922        for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1923        Request::PathBuilder: SupportedPathBuilder,
1924        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1925        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1926    {
1927        let homeserver = self.homeserver().to_string();
1928        let access_token = self.access_token();
1929        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1930
1931        let path_builder_input =
1932            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1933
1934        self.inner
1935            .http_client
1936            .send(
1937                request,
1938                config,
1939                homeserver,
1940                access_token.as_deref(),
1941                path_builder_input,
1942                send_progress,
1943            )
1944            .await
1945    }
1946
1947    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1948        _ = self
1949            .inner
1950            .auth_ctx
1951            .session_change_sender
1952            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1953    }
1954
1955    /// Fetches server versions from network; no caching.
1956    pub async fn fetch_server_versions(
1957        &self,
1958        request_config: Option<RequestConfig>,
1959    ) -> HttpResult<get_supported_versions::Response> {
1960        let server_versions = self
1961            .send_inner(get_supported_versions::Request::new(), request_config, Default::default())
1962            .await?;
1963
1964        Ok(server_versions)
1965    }
1966
1967    /// Fetches client well_known from network; no caching.
1968    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1969        let server_url_string = self
1970            .server()
1971            .unwrap_or(
1972                // Sometimes people configure their well-known directly on the homeserver so use
1973                // this as a fallback when the server name is unknown.
1974                &self.homeserver(),
1975            )
1976            .to_string();
1977
1978        let well_known = self
1979            .inner
1980            .http_client
1981            .send(
1982                discover_homeserver::Request::new(),
1983                Some(RequestConfig::short_retry()),
1984                server_url_string,
1985                None,
1986                (),
1987                Default::default(),
1988            )
1989            .await;
1990
1991        match well_known {
1992            Ok(well_known) => Some(well_known),
1993            Err(http_error) => {
1994                // It is perfectly valid to not have a well-known file.
1995                // Maybe we should check for a specific error code to be sure?
1996                warn!("Failed to fetch client well-known: {http_error}");
1997                None
1998            }
1999        }
2000    }
2001
2002    /// Load server info from storage, or fetch them from network and cache
2003    /// them.
2004    async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
2005        match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
2006            Ok(Some(stored)) => {
2007                if let Some(server_info) =
2008                    stored.into_server_info().and_then(|info| info.maybe_decode())
2009                {
2010                    return Ok(server_info);
2011                }
2012            }
2013            Ok(None) => {
2014                // fallthrough: cache is empty
2015            }
2016            Err(err) => {
2017                warn!("error when loading cached server info: {err}");
2018                // fallthrough to network.
2019            }
2020        }
2021
2022        let server_versions = self.fetch_server_versions(None).await?;
2023        let well_known = self.fetch_client_well_known().await;
2024        let server_info = ServerInfo::new(
2025            server_versions.versions.clone(),
2026            server_versions.unstable_features.clone(),
2027            well_known.map(Into::into),
2028        );
2029
2030        // Attempt to cache the result in storage.
2031        {
2032            if let Err(err) = self
2033                .state_store()
2034                .set_kv_data(
2035                    StateStoreDataKey::ServerInfo,
2036                    StateStoreDataValue::ServerInfo(server_info.clone()),
2037                )
2038                .await
2039            {
2040                warn!("error when caching server info: {err}");
2041            }
2042        }
2043
2044        Ok(server_info)
2045    }
2046
2047    pub(crate) async fn get_cached_versions(&self) -> Option<SupportedVersions> {
2048        let server_info = &self.inner.caches.server_info;
2049
2050        if let CachedValue::Cached(val) = &server_info.read().await.supported_versions {
2051            Some(val.clone())
2052        } else {
2053            None
2054        }
2055    }
2056
2057    async fn get_or_load_and_cache_server_info<
2058        Value,
2059        MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
2060    >(
2061        &self,
2062        map: MapFunction,
2063    ) -> HttpResult<Value> {
2064        let server_info = &self.inner.caches.server_info;
2065        if let CachedValue::Cached(val) = map(&*server_info.read().await) {
2066            return Ok(val);
2067        }
2068
2069        let mut guarded_server_info = server_info.write().await;
2070        if let CachedValue::Cached(val) = map(&guarded_server_info) {
2071            return Ok(val);
2072        }
2073
2074        let server_info = self.load_or_fetch_server_info().await?;
2075
2076        // Fill both unstable features and server versions at once.
2077        let mut supported = server_info.supported_versions();
2078        if supported.versions.is_empty() {
2079            supported.versions = [MatrixVersion::V1_0].into();
2080        }
2081
2082        guarded_server_info.supported_versions = CachedValue::Cached(supported);
2083        guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2084
2085        // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2086        // fetch an optional property), the function will always return some.
2087        Ok(map(&guarded_server_info).unwrap_cached_value())
2088    }
2089
2090    /// Get the Matrix versions and features supported by the homeserver by
2091    /// fetching them from the server or the cache.
2092    ///
2093    /// This is equivalent to calling both [`Client::server_versions()`] and
2094    /// [`Client::unstable_features()`]. To always fetch the result from the
2095    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2096    /// and then `.as_supported_versions()` on the response.
2097    ///
2098    /// # Examples
2099    ///
2100    /// ```no_run
2101    /// use ruma::api::{FeatureFlag, MatrixVersion};
2102    /// # use matrix_sdk::{Client, config::SyncSettings};
2103    /// # use url::Url;
2104    /// # async {
2105    /// # let homeserver = Url::parse("http://localhost:8080")?;
2106    /// # let mut client = Client::new(homeserver).await?;
2107    ///
2108    /// let supported = client.supported_versions().await?;
2109    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2110    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2111    ///
2112    /// let msc_x_feature = FeatureFlag::from("msc_x");
2113    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2114    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2115    /// # anyhow::Ok(()) };
2116    /// ```
2117    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2118        self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2119            .await
2120    }
2121
2122    /// Get the Matrix versions and features supported by the homeserver by
2123    /// fetching them from the cache.
2124    ///
2125    /// For a version of this function that fetches the supported versions and
2126    /// features from the homeserver if the [`SupportedVersions`] aren't
2127    /// found in the cache, take a look at the [`Client::server_versions()`]
2128    /// method.
2129    ///
2130    /// # Examples
2131    ///
2132    /// ```no_run
2133    /// use ruma::api::{FeatureFlag, MatrixVersion};
2134    /// # use matrix_sdk::{Client, config::SyncSettings};
2135    /// # use url::Url;
2136    /// # async {
2137    /// # let homeserver = Url::parse("http://localhost:8080")?;
2138    /// # let mut client = Client::new(homeserver).await?;
2139    ///
2140    /// let supported =
2141    ///     if let Some(supported) = client.supported_versions_cached().await? {
2142    ///         supported
2143    ///     } else {
2144    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2145    ///     };
2146    ///
2147    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2148    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2149    ///
2150    /// let msc_x_feature = FeatureFlag::from("msc_x");
2151    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2152    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2153    /// # anyhow::Ok(()) };
2154    /// ```
2155    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2156        if let Some(cached) = self.get_cached_versions().await {
2157            Ok(Some(cached))
2158        } else if let Some(stored) =
2159            self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await?
2160        {
2161            if let Some(server_info) =
2162                stored.into_server_info().and_then(|info| info.maybe_decode())
2163            {
2164                Ok(Some(server_info.supported_versions()))
2165            } else {
2166                Ok(None)
2167            }
2168        } else {
2169            Ok(None)
2170        }
2171    }
2172
2173    /// Get the Matrix versions supported by the homeserver by fetching them
2174    /// from the server or the cache.
2175    ///
2176    /// # Examples
2177    ///
2178    /// ```no_run
2179    /// use ruma::api::MatrixVersion;
2180    /// # use matrix_sdk::{Client, config::SyncSettings};
2181    /// # use url::Url;
2182    /// # async {
2183    /// # let homeserver = Url::parse("http://localhost:8080")?;
2184    /// # let mut client = Client::new(homeserver).await?;
2185    ///
2186    /// let server_versions = client.server_versions().await?;
2187    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2188    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2189    /// # anyhow::Ok(()) };
2190    /// ```
2191    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2192        self.get_or_load_and_cache_server_info(|server_info| {
2193            server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2194        })
2195        .await
2196    }
2197
2198    /// Get the unstable features supported by the homeserver by fetching them
2199    /// from the server or the cache.
2200    ///
2201    /// # Examples
2202    ///
2203    /// ```no_run
2204    /// use matrix_sdk::ruma::api::FeatureFlag;
2205    /// # use matrix_sdk::{Client, config::SyncSettings};
2206    /// # use url::Url;
2207    /// # async {
2208    /// # let homeserver = Url::parse("http://localhost:8080")?;
2209    /// # let mut client = Client::new(homeserver).await?;
2210    ///
2211    /// let msc_x_feature = FeatureFlag::from("msc_x");
2212    /// let unstable_features = client.unstable_features().await?;
2213    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2214    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2215    /// # anyhow::Ok(()) };
2216    /// ```
2217    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2218        self.get_or_load_and_cache_server_info(|server_info| {
2219            server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2220        })
2221        .await
2222    }
2223
2224    /// Get information about the homeserver's advertised RTC foci by fetching
2225    /// the well-known file from the server or the cache.
2226    ///
2227    /// # Examples
2228    /// ```no_run
2229    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2230    /// # use url::Url;
2231    /// # async {
2232    /// # let homeserver = Url::parse("http://localhost:8080")?;
2233    /// # let mut client = Client::new(homeserver).await?;
2234    /// let rtc_foci = client.rtc_foci().await?;
2235    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2236    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2237    ///     _ => None,
2238    /// });
2239    /// if let Some(info) = default_livekit_focus_info {
2240    ///     println!("Default LiveKit service URL: {}", info.service_url);
2241    /// }
2242    /// # anyhow::Ok(()) };
2243    /// ```
2244    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2245        let well_known = self
2246            .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2247            .await?;
2248
2249        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2250    }
2251
2252    /// Empty the server version and unstable features cache.
2253    ///
2254    /// Since the SDK caches server info (versions, unstable features,
2255    /// well-known etc), it's possible to have a stale entry in the cache. This
2256    /// functions makes it possible to force reset it.
2257    pub async fn reset_server_info(&self) -> Result<()> {
2258        // Empty the in-memory caches.
2259        let mut guard = self.inner.caches.server_info.write().await;
2260        guard.supported_versions = CachedValue::NotSet;
2261
2262        // Empty the store cache.
2263        Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2264    }
2265
2266    /// Check whether MSC 4028 is enabled on the homeserver.
2267    ///
2268    /// # Examples
2269    ///
2270    /// ```no_run
2271    /// # use matrix_sdk::{Client, config::SyncSettings};
2272    /// # use url::Url;
2273    /// # async {
2274    /// # let homeserver = Url::parse("http://localhost:8080")?;
2275    /// # let mut client = Client::new(homeserver).await?;
2276    /// let msc4028_enabled =
2277    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2278    /// # anyhow::Ok(()) };
2279    /// ```
2280    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2281        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2282    }
2283
2284    /// Get information of all our own devices.
2285    ///
2286    /// # Examples
2287    ///
2288    /// ```no_run
2289    /// # use matrix_sdk::{Client, config::SyncSettings};
2290    /// # use url::Url;
2291    /// # async {
2292    /// # let homeserver = Url::parse("http://localhost:8080")?;
2293    /// # let mut client = Client::new(homeserver).await?;
2294    /// let response = client.devices().await?;
2295    ///
2296    /// for device in response.devices {
2297    ///     println!(
2298    ///         "Device: {} {}",
2299    ///         device.device_id,
2300    ///         device.display_name.as_deref().unwrap_or("")
2301    ///     );
2302    /// }
2303    /// # anyhow::Ok(()) };
2304    /// ```
2305    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2306        let request = get_devices::v3::Request::new();
2307
2308        self.send(request).await
2309    }
2310
2311    /// Delete the given devices from the server.
2312    ///
2313    /// # Arguments
2314    ///
2315    /// * `devices` - The list of devices that should be deleted from the
2316    ///   server.
2317    ///
2318    /// * `auth_data` - This request requires user interactive auth, the first
2319    ///   request needs to set this to `None` and will always fail with an
2320    ///   `UiaaResponse`. The response will contain information for the
2321    ///   interactive auth and the same request needs to be made but this time
2322    ///   with some `auth_data` provided.
2323    ///
2324    /// ```no_run
2325    /// # use matrix_sdk::{
2326    /// #    ruma::{api::client::uiaa, device_id},
2327    /// #    Client, Error, config::SyncSettings,
2328    /// # };
2329    /// # use serde_json::json;
2330    /// # use url::Url;
2331    /// # use std::collections::BTreeMap;
2332    /// # async {
2333    /// # let homeserver = Url::parse("http://localhost:8080")?;
2334    /// # let mut client = Client::new(homeserver).await?;
2335    /// let devices = &[device_id!("DEVICEID").to_owned()];
2336    ///
2337    /// if let Err(e) = client.delete_devices(devices, None).await {
2338    ///     if let Some(info) = e.as_uiaa_response() {
2339    ///         let mut password = uiaa::Password::new(
2340    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2341    ///             "wordpass".to_owned(),
2342    ///         );
2343    ///         password.session = info.session.clone();
2344    ///
2345    ///         client
2346    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2347    ///             .await?;
2348    ///     }
2349    /// }
2350    /// # anyhow::Ok(()) };
2351    pub async fn delete_devices(
2352        &self,
2353        devices: &[OwnedDeviceId],
2354        auth_data: Option<uiaa::AuthData>,
2355    ) -> HttpResult<delete_devices::v3::Response> {
2356        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2357        request.auth = auth_data;
2358
2359        self.send(request).await
2360    }
2361
2362    /// Change the display name of a device owned by the current user.
2363    ///
2364    /// Returns a `update_device::Response` which specifies the result
2365    /// of the operation.
2366    ///
2367    /// # Arguments
2368    ///
2369    /// * `device_id` - The ID of the device to change the display name of.
2370    /// * `display_name` - The new display name to set.
2371    pub async fn rename_device(
2372        &self,
2373        device_id: &DeviceId,
2374        display_name: &str,
2375    ) -> HttpResult<update_device::v3::Response> {
2376        let mut request = update_device::v3::Request::new(device_id.to_owned());
2377        request.display_name = Some(display_name.to_owned());
2378
2379        self.send(request).await
2380    }
2381
2382    /// Check whether a device with a specific ID exists on the server.
2383    ///
2384    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2385    /// with 404 and the underlying error otherwise.
2386    ///
2387    /// # Arguments
2388    ///
2389    /// * `device_id` - The ID of the device to query.
2390    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2391        let request = device::get_device::v3::Request::new(device_id);
2392        match self.send(request).await {
2393            Ok(_) => Ok(true),
2394            Err(err) => {
2395                if let Some(error) = err.as_client_api_error()
2396                    && error.status_code == 404
2397                {
2398                    Ok(false)
2399                } else {
2400                    Err(err.into())
2401                }
2402            }
2403        }
2404    }
2405
2406    /// Synchronize the client's state with the latest state on the server.
2407    ///
2408    /// ## Syncing Events
2409    ///
2410    /// Messages or any other type of event need to be periodically fetched from
2411    /// the server, this is achieved by sending a `/sync` request to the server.
2412    ///
2413    /// The first sync is sent out without a [`token`]. The response of the
2414    /// first sync will contain a [`next_batch`] field which should then be
2415    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2416    /// don't receive the same events multiple times.
2417    ///
2418    /// ## Long Polling
2419    ///
2420    /// A sync should in the usual case always be in flight. The
2421    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2422    /// long the server will wait for new events before it will respond.
2423    /// The server will respond immediately if some new events arrive before the
2424    /// timeout has expired. If no changes arrive and the timeout expires an
2425    /// empty sync response will be sent to the client.
2426    ///
2427    /// This method of sending a request that may not receive a response
2428    /// immediately is called long polling.
2429    ///
2430    /// ## Filtering Events
2431    ///
2432    /// The number or type of messages and events that the client should receive
2433    /// from the server can be altered using a [`Filter`].
2434    ///
2435    /// Filters can be non-trivial and, since they will be sent with every sync
2436    /// request, they may take up a bunch of unnecessary bandwidth.
2437    ///
2438    /// Luckily filters can be uploaded to the server and reused using an unique
2439    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2440    /// method.
2441    ///
2442    /// # Arguments
2443    ///
2444    /// * `sync_settings` - Settings for the sync call, this allows us to set
2445    /// various options to configure the sync:
2446    ///     * [`filter`] - To configure which events we receive and which get
2447    ///       [filtered] by the server
2448    ///     * [`timeout`] - To configure our [long polling] setup.
2449    ///     * [`token`] - To tell the server which events we already received
2450    ///       and where we wish to continue syncing.
2451    ///     * [`full_state`] - To tell the server that we wish to receive all
2452    ///       state events, regardless of our configured [`token`].
2453    ///     * [`set_presence`] - To tell the server to set the presence and to
2454    ///       which state.
2455    ///
2456    /// # Examples
2457    ///
2458    /// ```no_run
2459    /// # use url::Url;
2460    /// # async {
2461    /// # let homeserver = Url::parse("http://localhost:8080")?;
2462    /// # let username = "";
2463    /// # let password = "";
2464    /// use matrix_sdk::{
2465    ///     Client, config::SyncSettings,
2466    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2467    /// };
2468    ///
2469    /// let client = Client::new(homeserver).await?;
2470    /// client.matrix_auth().login_username(username, password).send().await?;
2471    ///
2472    /// // Sync once so we receive the client state and old messages.
2473    /// client.sync_once(SyncSettings::default()).await?;
2474    ///
2475    /// // Register our handler so we start responding once we receive a new
2476    /// // event.
2477    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2478    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2479    /// });
2480    ///
2481    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2482    /// // from our `sync_once()` call automatically.
2483    /// client.sync(SyncSettings::default()).await;
2484    /// # anyhow::Ok(()) };
2485    /// ```
2486    ///
2487    /// [`sync`]: #method.sync
2488    /// [`SyncSettings`]: crate::config::SyncSettings
2489    /// [`token`]: crate::config::SyncSettings#method.token
2490    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2491    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2492    /// [`set_presence`]: ruma::presence::PresenceState
2493    /// [`filter`]: crate::config::SyncSettings#method.filter
2494    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2495    /// [`next_batch`]: SyncResponse#structfield.next_batch
2496    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2497    /// [long polling]: #long-polling
2498    /// [filtered]: #filtering-events
2499    #[instrument(skip(self))]
2500    pub async fn sync_once(
2501        &self,
2502        sync_settings: crate::config::SyncSettings,
2503    ) -> Result<SyncResponse> {
2504        // The sync might not return for quite a while due to the timeout.
2505        // We'll see if there's anything crypto related to send out before we
2506        // sync, i.e. if we closed our client after a sync but before the
2507        // crypto requests were sent out.
2508        //
2509        // This will mostly be a no-op.
2510        #[cfg(feature = "e2e-encryption")]
2511        if let Err(e) = self.send_outgoing_requests().await {
2512            error!(error = ?e, "Error while sending outgoing E2EE requests");
2513        }
2514
2515        let token = match sync_settings.token {
2516            SyncToken::Specific(token) => Some(token),
2517            SyncToken::NoToken => None,
2518            SyncToken::ReusePrevious => self.sync_token().await,
2519        };
2520
2521        let request = assign!(sync_events::v3::Request::new(), {
2522            filter: sync_settings.filter.map(|f| *f),
2523            since: token,
2524            full_state: sync_settings.full_state,
2525            set_presence: sync_settings.set_presence,
2526            timeout: sync_settings.timeout,
2527            use_state_after: true,
2528        });
2529        let mut request_config = self.request_config();
2530        if let Some(timeout) = sync_settings.timeout {
2531            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2532            request_config.timeout = Some(base_timeout + timeout);
2533        }
2534
2535        let response = self.send(request).with_request_config(request_config).await?;
2536        let next_batch = response.next_batch.clone();
2537        let response = self.process_sync(response).await?;
2538
2539        #[cfg(feature = "e2e-encryption")]
2540        if let Err(e) = self.send_outgoing_requests().await {
2541            error!(error = ?e, "Error while sending outgoing E2EE requests");
2542        }
2543
2544        self.inner.sync_beat.notify(usize::MAX);
2545
2546        Ok(SyncResponse::new(next_batch, response))
2547    }
2548
2549    /// Repeatedly synchronize the client state with the server.
2550    ///
2551    /// This method will only return on error, if cancellation is needed
2552    /// the method should be wrapped in a cancelable task or the
2553    /// [`Client::sync_with_callback`] method can be used or
2554    /// [`Client::sync_with_result_callback`] if you want to handle error
2555    /// cases in the loop, too.
2556    ///
2557    /// This method will internally call [`Client::sync_once`] in a loop.
2558    ///
2559    /// This method can be used with the [`Client::add_event_handler`]
2560    /// method to react to individual events. If you instead wish to handle
2561    /// events in a bulk manner the [`Client::sync_with_callback`],
2562    /// [`Client::sync_with_result_callback`] and
2563    /// [`Client::sync_stream`] methods can be used instead. Those methods
2564    /// repeatedly return the whole sync response.
2565    ///
2566    /// # Arguments
2567    ///
2568    /// * `sync_settings` - Settings for the sync call. *Note* that those
2569    ///   settings will be only used for the first sync call. See the argument
2570    ///   docs for [`Client::sync_once`] for more info.
2571    ///
2572    /// # Return
2573    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2574    /// up to the user of the API to check the error and decide whether the sync
2575    /// should continue or not.
2576    ///
2577    /// # Examples
2578    ///
2579    /// ```no_run
2580    /// # use url::Url;
2581    /// # async {
2582    /// # let homeserver = Url::parse("http://localhost:8080")?;
2583    /// # let username = "";
2584    /// # let password = "";
2585    /// use matrix_sdk::{
2586    ///     Client, config::SyncSettings,
2587    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2588    /// };
2589    ///
2590    /// let client = Client::new(homeserver).await?;
2591    /// client.matrix_auth().login_username(&username, &password).send().await?;
2592    ///
2593    /// // Register our handler so we start responding once we receive a new
2594    /// // event.
2595    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2596    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2597    /// });
2598    ///
2599    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2600    /// // automatically.
2601    /// client.sync(SyncSettings::default()).await?;
2602    /// # anyhow::Ok(()) };
2603    /// ```
2604    ///
2605    /// [argument docs]: #method.sync_once
2606    /// [`sync_with_callback`]: #method.sync_with_callback
2607    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2608        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2609    }
2610
2611    /// Repeatedly call sync to synchronize the client state with the server.
2612    ///
2613    /// # Arguments
2614    ///
2615    /// * `sync_settings` - Settings for the sync call. *Note* that those
2616    ///   settings will be only used for the first sync call. See the argument
2617    ///   docs for [`Client::sync_once`] for more info.
2618    ///
2619    /// * `callback` - A callback that will be called every time a successful
2620    ///   response has been fetched from the server. The callback must return a
2621    ///   boolean which signalizes if the method should stop syncing. If the
2622    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2623    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2624    ///
2625    /// # Return
2626    /// The sync runs until an error occurs or the
2627    /// callback indicates that the Loop should stop. If the callback asked for
2628    /// a regular stop, the result will be `Ok(())` otherwise the
2629    /// `Err(Error)` is returned.
2630    ///
2631    /// # Examples
2632    ///
2633    /// The following example demonstrates how to sync forever while sending all
2634    /// the interesting events through a mpsc channel to another thread e.g. a
2635    /// UI thread.
2636    ///
2637    /// ```no_run
2638    /// # use std::time::Duration;
2639    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2640    /// # use url::Url;
2641    /// # async {
2642    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2643    /// # let mut client = Client::new(homeserver).await.unwrap();
2644    ///
2645    /// use tokio::sync::mpsc::channel;
2646    ///
2647    /// let (tx, rx) = channel(100);
2648    ///
2649    /// let sync_channel = &tx;
2650    /// let sync_settings = SyncSettings::new()
2651    ///     .timeout(Duration::from_secs(30));
2652    ///
2653    /// client
2654    ///     .sync_with_callback(sync_settings, |response| async move {
2655    ///         let channel = sync_channel;
2656    ///         for (room_id, room) in response.rooms.joined {
2657    ///             for event in room.timeline.events {
2658    ///                 channel.send(event).await.unwrap();
2659    ///             }
2660    ///         }
2661    ///
2662    ///         LoopCtrl::Continue
2663    ///     })
2664    ///     .await;
2665    /// };
2666    /// ```
2667    #[instrument(skip_all)]
2668    pub async fn sync_with_callback<C>(
2669        &self,
2670        sync_settings: crate::config::SyncSettings,
2671        callback: impl Fn(SyncResponse) -> C,
2672    ) -> Result<(), Error>
2673    where
2674        C: Future<Output = LoopCtrl>,
2675    {
2676        self.sync_with_result_callback(sync_settings, |result| async {
2677            Ok(callback(result?).await)
2678        })
2679        .await
2680    }
2681
2682    /// Repeatedly call sync to synchronize the client state with the server.
2683    ///
2684    /// # Arguments
2685    ///
2686    /// * `sync_settings` - Settings for the sync call. *Note* that those
2687    ///   settings will be only used for the first sync call. See the argument
2688    ///   docs for [`Client::sync_once`] for more info.
2689    ///
2690    /// * `callback` - A callback that will be called every time after a
2691    ///   response has been received, failure or not. The callback returns a
2692    ///   `Result<LoopCtrl, Error>`, too. When returning
2693    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2694    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2695    ///   function returns `Ok(())`. In case the callback can't handle the
2696    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2697    ///   which results in the sync ending and the `Err(Error)` being returned.
2698    ///
2699    /// # Return
2700    /// The sync runs until an error occurs that the callback can't handle or
2701    /// the callback indicates that the Loop should stop. If the callback
2702    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2703    /// `Err(Error)` is returned.
2704    ///
2705    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2706    /// this, and are handled first without sending the result to the
2707    /// callback. Only after they have exceeded is the `Result` handed to
2708    /// the callback.
2709    ///
2710    /// # Examples
2711    ///
2712    /// The following example demonstrates how to sync forever while sending all
2713    /// the interesting events through a mpsc channel to another thread e.g. a
2714    /// UI thread.
2715    ///
2716    /// ```no_run
2717    /// # use std::time::Duration;
2718    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2719    /// # use url::Url;
2720    /// # async {
2721    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2722    /// # let mut client = Client::new(homeserver).await.unwrap();
2723    /// #
2724    /// use tokio::sync::mpsc::channel;
2725    ///
2726    /// let (tx, rx) = channel(100);
2727    ///
2728    /// let sync_channel = &tx;
2729    /// let sync_settings = SyncSettings::new()
2730    ///     .timeout(Duration::from_secs(30));
2731    ///
2732    /// client
2733    ///     .sync_with_result_callback(sync_settings, |response| async move {
2734    ///         let channel = sync_channel;
2735    ///         let sync_response = response?;
2736    ///         for (room_id, room) in sync_response.rooms.joined {
2737    ///              for event in room.timeline.events {
2738    ///                  channel.send(event).await.unwrap();
2739    ///               }
2740    ///         }
2741    ///
2742    ///         Ok(LoopCtrl::Continue)
2743    ///     })
2744    ///     .await;
2745    /// };
2746    /// ```
2747    #[instrument(skip(self, callback))]
2748    pub async fn sync_with_result_callback<C>(
2749        &self,
2750        sync_settings: crate::config::SyncSettings,
2751        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2752    ) -> Result<(), Error>
2753    where
2754        C: Future<Output = Result<LoopCtrl, Error>>,
2755    {
2756        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2757
2758        while let Some(result) = sync_stream.next().await {
2759            trace!("Running callback");
2760            if callback(result).await? == LoopCtrl::Break {
2761                trace!("Callback told us to stop");
2762                break;
2763            }
2764            trace!("Done running callback");
2765        }
2766
2767        Ok(())
2768    }
2769
2770    //// Repeatedly synchronize the client state with the server.
2771    ///
2772    /// This method will internally call [`Client::sync_once`] in a loop and is
2773    /// equivalent to the [`Client::sync`] method but the responses are provided
2774    /// as an async stream.
2775    ///
2776    /// # Arguments
2777    ///
2778    /// * `sync_settings` - Settings for the sync call. *Note* that those
2779    ///   settings will be only used for the first sync call. See the argument
2780    ///   docs for [`Client::sync_once`] for more info.
2781    ///
2782    /// # Examples
2783    ///
2784    /// ```no_run
2785    /// # use url::Url;
2786    /// # async {
2787    /// # let homeserver = Url::parse("http://localhost:8080")?;
2788    /// # let username = "";
2789    /// # let password = "";
2790    /// use futures_util::StreamExt;
2791    /// use matrix_sdk::{Client, config::SyncSettings};
2792    ///
2793    /// let client = Client::new(homeserver).await?;
2794    /// client.matrix_auth().login_username(&username, &password).send().await?;
2795    ///
2796    /// let mut sync_stream =
2797    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2798    ///
2799    /// while let Some(Ok(response)) = sync_stream.next().await {
2800    ///     for room in response.rooms.joined.values() {
2801    ///         for e in &room.timeline.events {
2802    ///             if let Ok(event) = e.raw().deserialize() {
2803    ///                 println!("Received event {:?}", event);
2804    ///             }
2805    ///         }
2806    ///     }
2807    /// }
2808    ///
2809    /// # anyhow::Ok(()) };
2810    /// ```
2811    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2812    #[instrument(skip(self))]
2813    pub async fn sync_stream(
2814        &self,
2815        mut sync_settings: crate::config::SyncSettings,
2816    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2817        let mut is_first_sync = true;
2818        let mut timeout = None;
2819        let mut last_sync_time: Option<Instant> = None;
2820
2821        let parent_span = Span::current();
2822
2823        async_stream::stream!({
2824            loop {
2825                trace!("Syncing");
2826
2827                if sync_settings.ignore_timeout_on_first_sync {
2828                    if is_first_sync {
2829                        timeout = sync_settings.timeout.take();
2830                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
2831                        sync_settings.timeout = timeout.take();
2832                    }
2833
2834                    is_first_sync = false;
2835                }
2836
2837                yield self
2838                    .sync_loop_helper(&mut sync_settings)
2839                    .instrument(parent_span.clone())
2840                    .await;
2841
2842                Client::delay_sync(&mut last_sync_time).await
2843            }
2844        })
2845    }
2846
2847    /// Get the current, if any, sync token of the client.
2848    /// This will be None if the client didn't sync at least once.
2849    pub(crate) async fn sync_token(&self) -> Option<String> {
2850        self.inner.base_client.sync_token().await
2851    }
2852
2853    /// Gets information about the owner of a given access token.
2854    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2855        let request = whoami::v3::Request::new();
2856        self.send(request).await
2857    }
2858
2859    /// Subscribes a new receiver to client SessionChange broadcasts.
2860    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2861        let broadcast = &self.auth_ctx().session_change_sender;
2862        broadcast.subscribe()
2863    }
2864
2865    /// Sets the save/restore session callbacks.
2866    ///
2867    /// This is another mechanism to get synchronous updates to session tokens,
2868    /// while [`Self::subscribe_to_session_changes`] provides an async update.
2869    pub fn set_session_callbacks(
2870        &self,
2871        reload_session_callback: Box<ReloadSessionCallback>,
2872        save_session_callback: Box<SaveSessionCallback>,
2873    ) -> Result<()> {
2874        self.inner
2875            .auth_ctx
2876            .reload_session_callback
2877            .set(reload_session_callback)
2878            .map_err(|_| Error::MultipleSessionCallbacks)?;
2879
2880        self.inner
2881            .auth_ctx
2882            .save_session_callback
2883            .set(save_session_callback)
2884            .map_err(|_| Error::MultipleSessionCallbacks)?;
2885
2886        Ok(())
2887    }
2888
2889    /// Get the notification settings of the current owner of the client.
2890    pub async fn notification_settings(&self) -> NotificationSettings {
2891        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2892        NotificationSettings::new(self.clone(), ruleset)
2893    }
2894
2895    /// Create a new specialized `Client` that can process notifications.
2896    ///
2897    /// See [`CrossProcessLock::new`] to learn more about
2898    /// `cross_process_store_locks_holder_name`.
2899    ///
2900    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
2901    pub async fn notification_client(
2902        &self,
2903        cross_process_store_locks_holder_name: String,
2904    ) -> Result<Client> {
2905        let client = Client {
2906            inner: ClientInner::new(
2907                self.inner.auth_ctx.clone(),
2908                self.server().cloned(),
2909                self.homeserver(),
2910                self.sliding_sync_version(),
2911                self.inner.http_client.clone(),
2912                self.inner
2913                    .base_client
2914                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2915                    .await?,
2916                self.inner.caches.server_info.read().await.clone(),
2917                self.inner.respect_login_well_known,
2918                self.inner.event_cache.clone(),
2919                self.inner.send_queue_data.clone(),
2920                self.inner.latest_events.clone(),
2921                #[cfg(feature = "e2e-encryption")]
2922                self.inner.e2ee.encryption_settings,
2923                #[cfg(feature = "e2e-encryption")]
2924                self.inner.enable_share_history_on_invite,
2925                cross_process_store_locks_holder_name,
2926                #[cfg(feature = "experimental-search")]
2927                self.inner.search_index.clone(),
2928                self.inner.thread_subscription_catchup.clone(),
2929            )
2930            .await,
2931        };
2932
2933        Ok(client)
2934    }
2935
2936    /// The [`EventCache`] instance for this [`Client`].
2937    pub fn event_cache(&self) -> &EventCache {
2938        // SAFETY: always initialized in the `Client` ctor.
2939        self.inner.event_cache.get().unwrap()
2940    }
2941
2942    /// The [`LatestEvents`] instance for this [`Client`].
2943    pub async fn latest_events(&self) -> &LatestEvents {
2944        self.inner
2945            .latest_events
2946            .get_or_init(|| async {
2947                LatestEvents::new(
2948                    WeakClient::from_client(self),
2949                    self.event_cache().clone(),
2950                    SendQueue::new(self.clone()),
2951                )
2952            })
2953            .await
2954    }
2955
2956    /// Waits until an at least partially synced room is received, and returns
2957    /// it.
2958    ///
2959    /// **Note: this function will loop endlessly until either it finds the room
2960    /// or an externally set timeout happens.**
2961    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2962        loop {
2963            if let Some(room) = self.get_room(room_id) {
2964                if room.is_state_partially_or_fully_synced() {
2965                    debug!("Found just created room!");
2966                    return room;
2967                }
2968                debug!("Room wasn't partially synced, waiting for sync beat to try again");
2969            } else {
2970                debug!("Room wasn't found, waiting for sync beat to try again");
2971            }
2972            self.inner.sync_beat.listen().await;
2973        }
2974    }
2975
2976    /// Knock on a room given its `room_id_or_alias` to ask for permission to
2977    /// join it.
2978    pub async fn knock(
2979        &self,
2980        room_id_or_alias: OwnedRoomOrAliasId,
2981        reason: Option<String>,
2982        server_names: Vec<OwnedServerName>,
2983    ) -> Result<Room> {
2984        let request =
2985            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2986        let response = self.send(request).await?;
2987        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2988        Ok(Room::new(self.clone(), base_room))
2989    }
2990
2991    /// Checks whether the provided `user_id` belongs to an ignored user.
2992    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2993        self.base_client().is_user_ignored(user_id).await
2994    }
2995
2996    /// Gets the `max_upload_size` value from the homeserver, getting either a
2997    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2998    /// missing.
2999    ///
3000    /// Check the spec for more info:
3001    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3002    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3003        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3004        if let Some(data) = max_upload_size_lock.get() {
3005            return Ok(data.to_owned());
3006        }
3007
3008        // Use the authenticated endpoint when the server supports it.
3009        let supported_versions = self.supported_versions().await?;
3010        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3011            .is_supported(&supported_versions);
3012
3013        let upload_size = if use_auth {
3014            self.send(authenticated_media::get_media_config::v1::Request::default())
3015                .await?
3016                .upload_size
3017        } else {
3018            #[allow(deprecated)]
3019            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3020        };
3021
3022        match max_upload_size_lock.set(upload_size) {
3023            Ok(_) => Ok(upload_size),
3024            Err(error) => {
3025                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3026            }
3027        }
3028    }
3029
3030    /// The settings to use for decrypting events.
3031    #[cfg(feature = "e2e-encryption")]
3032    pub fn decryption_settings(&self) -> &DecryptionSettings {
3033        &self.base_client().decryption_settings
3034    }
3035
3036    /// Returns the [`SearchIndex`] for this [`Client`].
3037    #[cfg(feature = "experimental-search")]
3038    pub fn search_index(&self) -> &SearchIndex {
3039        &self.inner.search_index
3040    }
3041
3042    /// Whether the client is configured to take thread subscriptions (MSC4306
3043    /// and MSC4308) into account.
3044    ///
3045    /// This may cause filtering out of thread subscriptions, and loading the
3046    /// thread subscriptions via the sliding sync extension, when the room
3047    /// list service is being used.
3048    pub fn enabled_thread_subscriptions(&self) -> bool {
3049        match self.base_client().threading_support {
3050            ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
3051            ThreadingSupport::Disabled => false,
3052        }
3053    }
3054
3055    /// Fetch thread subscriptions changes between `from` and up to `to`.
3056    ///
3057    /// The `limit` optional parameter can be used to limit the number of
3058    /// entries in a response. It can also be overridden by the server, if
3059    /// it's deemed too large.
3060    pub async fn fetch_thread_subscriptions(
3061        &self,
3062        from: Option<String>,
3063        to: Option<String>,
3064        limit: Option<UInt>,
3065    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3066        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3067            from,
3068            to,
3069            limit,
3070        });
3071        Ok(self.send(request).await?)
3072    }
3073
3074    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3075        self.inner.thread_subscription_catchup.get().unwrap()
3076    }
3077}
3078
3079#[cfg(any(feature = "testing", test))]
3080impl Client {
3081    /// Test helper to mark users as tracked by the crypto layer.
3082    #[cfg(feature = "e2e-encryption")]
3083    pub async fn update_tracked_users_for_testing(
3084        &self,
3085        user_ids: impl IntoIterator<Item = &UserId>,
3086    ) {
3087        let olm = self.olm_machine().await;
3088        let olm = olm.as_ref().unwrap();
3089        olm.update_tracked_users(user_ids).await.unwrap();
3090    }
3091}
3092
3093/// A weak reference to the inner client, useful when trying to get a handle
3094/// on the owning client.
3095#[derive(Clone, Debug)]
3096pub(crate) struct WeakClient {
3097    client: Weak<ClientInner>,
3098}
3099
3100impl WeakClient {
3101    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3102    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3103        Self { client: Arc::downgrade(client) }
3104    }
3105
3106    /// Construct a [`WeakClient`] from a [`Client`].
3107    pub fn from_client(client: &Client) -> Self {
3108        Self::from_inner(&client.inner)
3109    }
3110
3111    /// Attempts to get a [`Client`] from this [`WeakClient`].
3112    pub fn get(&self) -> Option<Client> {
3113        self.client.upgrade().map(|inner| Client { inner })
3114    }
3115
3116    /// Gets the number of strong (`Arc`) pointers still pointing to this
3117    /// client.
3118    #[allow(dead_code)]
3119    pub fn strong_count(&self) -> usize {
3120        self.client.strong_count()
3121    }
3122}
3123
3124#[derive(Clone)]
3125struct ClientServerInfo {
3126    /// The Matrix versions and unstable features the server supports (known
3127    /// ones only).
3128    supported_versions: CachedValue<SupportedVersions>,
3129
3130    /// The server's well-known file, if any.
3131    well_known: CachedValue<Option<WellKnownResponse>>,
3132}
3133
3134/// A cached value that can either be set or not set, used to avoid confusion
3135/// between a value that is set to `None` (because it doesn't exist) and a value
3136/// that has not been cached yet.
3137#[derive(Clone)]
3138enum CachedValue<Value> {
3139    /// A value has been cached.
3140    Cached(Value),
3141    /// Nothing has been cached yet.
3142    NotSet,
3143}
3144
3145impl<Value> CachedValue<Value> {
3146    /// Unwraps the cached value, returning it if it exists.
3147    ///
3148    /// # Panics
3149    ///
3150    /// If the cached value is not set, this will panic.
3151    fn unwrap_cached_value(self) -> Value {
3152        match self {
3153            CachedValue::Cached(value) => value,
3154            CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
3155        }
3156    }
3157
3158    /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
3159    fn as_ref(&self) -> CachedValue<&Value> {
3160        match self {
3161            Self::Cached(value) => CachedValue::Cached(value),
3162            Self::NotSet => CachedValue::NotSet,
3163        }
3164    }
3165
3166    /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3167    /// function to a contained value (if `Cached`) or returns `NotSet` (if
3168    /// `NotSet`).
3169    fn map<Other, F>(self, f: F) -> CachedValue<Other>
3170    where
3171        F: FnOnce(Value) -> Other,
3172    {
3173        match self {
3174            Self::Cached(value) => CachedValue::Cached(f(value)),
3175            Self::NotSet => CachedValue::NotSet,
3176        }
3177    }
3178}
3179
3180/// Information about the state of a room before we joined it.
3181#[derive(Debug, Clone, Default)]
3182struct PreJoinRoomInfo {
3183    /// The user who invited us to the room, if any.
3184    pub inviter: Option<RoomMember>,
3185}
3186
3187// The http mocking library is not supported for wasm32
3188#[cfg(all(test, not(target_family = "wasm")))]
3189pub(crate) mod tests {
3190    use std::{sync::Arc, time::Duration};
3191
3192    use assert_matches::assert_matches;
3193    use assert_matches2::assert_let;
3194    use eyeball::SharedObservable;
3195    use futures_util::{FutureExt, StreamExt, pin_mut};
3196    use js_int::{UInt, uint};
3197    use matrix_sdk_base::{
3198        RoomState,
3199        store::{MemoryStore, StoreConfig},
3200    };
3201    use matrix_sdk_test::{
3202        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3203        event_factory::EventFactory,
3204    };
3205    #[cfg(target_family = "wasm")]
3206    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3207
3208    use ruma::{
3209        RoomId, ServerName, UserId,
3210        api::{
3211            FeatureFlag, MatrixVersion,
3212            client::{
3213                discovery::discover_homeserver::RtcFocusInfo,
3214                room::create_room::v3::Request as CreateRoomRequest,
3215            },
3216        },
3217        assign,
3218        events::{
3219            ignored_user_list::IgnoredUserListEventContent,
3220            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3221        },
3222        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3223    };
3224    use serde_json::json;
3225    use stream_assert::{assert_next_matches, assert_pending};
3226    use tokio::{
3227        spawn,
3228        time::{sleep, timeout},
3229    };
3230    use url::Url;
3231
3232    use super::Client;
3233    use crate::{
3234        Error, TransmissionProgress,
3235        client::{WeakClient, futures::SendMediaUploadRequest},
3236        config::{RequestConfig, SyncSettings},
3237        futures::SendRequest,
3238        media::MediaError,
3239        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3240    };
3241
3242    #[async_test]
3243    async fn test_account_data() {
3244        let server = MatrixMockServer::new().await;
3245        let client = server.client_builder().build().await;
3246
3247        let f = EventFactory::new();
3248        server
3249            .mock_sync()
3250            .ok_and_run(&client, |builder| {
3251                builder.add_global_account_data(
3252                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3253                );
3254            })
3255            .await;
3256
3257        let content = client
3258            .account()
3259            .account_data::<IgnoredUserListEventContent>()
3260            .await
3261            .unwrap()
3262            .unwrap()
3263            .deserialize()
3264            .unwrap();
3265
3266        assert_eq!(content.ignored_users.len(), 1);
3267    }
3268
3269    #[async_test]
3270    async fn test_successful_discovery() {
3271        // Imagine this is `matrix.org`.
3272        let server = MatrixMockServer::new().await;
3273        let server_url = server.uri();
3274
3275        // Imagine this is `matrix-client.matrix.org`.
3276        let homeserver = MatrixMockServer::new().await;
3277        let homeserver_url = homeserver.uri();
3278
3279        // Imagine Alice has the user ID `@alice:matrix.org`.
3280        let domain = server_url.strip_prefix("http://").unwrap();
3281        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3282
3283        // The `.well-known` is on the server (e.g. `matrix.org`).
3284        server
3285            .mock_well_known()
3286            .ok_with_homeserver_url(&homeserver_url)
3287            .mock_once()
3288            .named("well-known")
3289            .mount()
3290            .await;
3291
3292        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3293        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3294
3295        let client = Client::builder()
3296            .insecure_server_name_no_tls(alice.server_name())
3297            .build()
3298            .await
3299            .unwrap();
3300
3301        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3302        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3303        client.server_versions().await.unwrap();
3304    }
3305
3306    #[async_test]
3307    async fn test_discovery_broken_server() {
3308        let server = MatrixMockServer::new().await;
3309        let server_url = server.uri();
3310        let domain = server_url.strip_prefix("http://").unwrap();
3311        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3312
3313        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3314
3315        assert!(
3316            Client::builder()
3317                .insecure_server_name_no_tls(alice.server_name())
3318                .build()
3319                .await
3320                .is_err(),
3321            "Creating a client from a user ID should fail when the .well-known request fails."
3322        );
3323    }
3324
3325    #[async_test]
3326    async fn test_room_creation() {
3327        let server = MatrixMockServer::new().await;
3328        let client = server.client_builder().build().await;
3329
3330        server
3331            .mock_sync()
3332            .ok_and_run(&client, |builder| {
3333                builder.add_joined_room(
3334                    JoinedRoomBuilder::default()
3335                        .add_state_event(StateTestEvent::Member)
3336                        .add_state_event(StateTestEvent::PowerLevels),
3337                );
3338            })
3339            .await;
3340
3341        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3342        assert_eq!(room.state(), RoomState::Joined);
3343    }
3344
3345    #[async_test]
3346    async fn test_retry_limit_http_requests() {
3347        let server = MatrixMockServer::new().await;
3348        let client = server
3349            .client_builder()
3350            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3351            .build()
3352            .await;
3353
3354        assert!(client.request_config().retry_limit.unwrap() == 3);
3355
3356        server.mock_login().error500().expect(3).mount().await;
3357
3358        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3359    }
3360
3361    #[async_test]
3362    async fn test_retry_timeout_http_requests() {
3363        // Keep this timeout small so that the test doesn't take long
3364        let retry_timeout = Duration::from_secs(5);
3365        let server = MatrixMockServer::new().await;
3366        let client = server
3367            .client_builder()
3368            .on_builder(|builder| {
3369                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3370            })
3371            .build()
3372            .await;
3373
3374        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3375
3376        server.mock_login().error500().expect(2..).mount().await;
3377
3378        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3379    }
3380
3381    #[async_test]
3382    async fn test_short_retry_initial_http_requests() {
3383        let server = MatrixMockServer::new().await;
3384        let client = server
3385            .client_builder()
3386            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3387            .build()
3388            .await;
3389
3390        server.mock_login().error500().expect(3..).mount().await;
3391
3392        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3393    }
3394
3395    #[async_test]
3396    async fn test_no_retry_http_requests() {
3397        let server = MatrixMockServer::new().await;
3398        let client = server.client_builder().build().await;
3399
3400        server.mock_devices().error500().mock_once().mount().await;
3401
3402        client.devices().await.unwrap_err();
3403    }
3404
3405    #[async_test]
3406    async fn test_set_homeserver() {
3407        let client = MockClientBuilder::new(None).build().await;
3408        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3409
3410        let homeserver = Url::parse("http://example.com/").unwrap();
3411        client.set_homeserver(homeserver.clone());
3412        assert_eq!(client.homeserver(), homeserver);
3413    }
3414
3415    #[async_test]
3416    async fn test_search_user_request() {
3417        let server = MatrixMockServer::new().await;
3418        let client = server.client_builder().build().await;
3419
3420        server.mock_user_directory().ok().mock_once().mount().await;
3421
3422        let response = client.search_users("test", 50).await.unwrap();
3423        assert_eq!(response.results.len(), 1);
3424        let result = &response.results[0];
3425        assert_eq!(result.user_id.to_string(), "@test:example.me");
3426        assert_eq!(result.display_name.clone().unwrap(), "Test");
3427        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3428        assert!(!response.limited);
3429    }
3430
3431    #[async_test]
3432    async fn test_request_unstable_features() {
3433        let server = MatrixMockServer::new().await;
3434        let client = server.client_builder().no_server_versions().build().await;
3435
3436        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3437
3438        let unstable_features = client.unstable_features().await.unwrap();
3439        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3440        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3441    }
3442
3443    #[async_test]
3444    async fn test_can_homeserver_push_encrypted_event_to_device() {
3445        let server = MatrixMockServer::new().await;
3446        let client = server.client_builder().no_server_versions().build().await;
3447
3448        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3449
3450        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3451        assert!(msc4028_enabled);
3452    }
3453
3454    #[async_test]
3455    async fn test_recently_visited_rooms() {
3456        // Tracking recently visited rooms requires authentication
3457        let client = MockClientBuilder::new(None).unlogged().build().await;
3458        assert_matches!(
3459            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3460            Err(Error::AuthenticationRequired)
3461        );
3462
3463        let client = MockClientBuilder::new(None).build().await;
3464        let account = client.account();
3465
3466        // We should start off with an empty list
3467        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3468
3469        // Tracking a valid room id should add it to the list
3470        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3471        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3472        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3473
3474        // And the existing list shouldn't be changed
3475        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3476        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3477
3478        // Tracking the same room again shouldn't change the list
3479        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3480        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3481        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3482
3483        // Tracking a second room should add it to the front of the list
3484        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3485        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3486        assert_eq!(
3487            account.get_recently_visited_rooms().await.unwrap(),
3488            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3489        );
3490
3491        // Tracking the first room yet again should move it to the front of the list
3492        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3493        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3494        assert_eq!(
3495            account.get_recently_visited_rooms().await.unwrap(),
3496            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3497        );
3498
3499        // Tracking should be capped at 20
3500        for n in 0..20 {
3501            account
3502                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3503                .await
3504                .unwrap();
3505        }
3506
3507        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3508
3509        // And the initial rooms should've been pushed out
3510        let rooms = account.get_recently_visited_rooms().await.unwrap();
3511        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3512        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3513
3514        // And the last tracked room should be the first
3515        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3516    }
3517
3518    #[async_test]
3519    async fn test_client_no_cycle_with_event_cache() {
3520        let client = MockClientBuilder::new(None).build().await;
3521
3522        // Wait for the init tasks to die.
3523        sleep(Duration::from_secs(1)).await;
3524
3525        let weak_client = WeakClient::from_client(&client);
3526        assert_eq!(weak_client.strong_count(), 1);
3527
3528        {
3529            let room_id = room_id!("!room:example.org");
3530
3531            // Have the client know the room.
3532            let response = SyncResponseBuilder::default()
3533                .add_joined_room(JoinedRoomBuilder::new(room_id))
3534                .build_sync_response();
3535            client.inner.base_client.receive_sync_response(response).await.unwrap();
3536
3537            client.event_cache().subscribe().unwrap();
3538
3539            let (_room_event_cache, _drop_handles) =
3540                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3541        }
3542
3543        drop(client);
3544
3545        // Give a bit of time for background tasks to die.
3546        sleep(Duration::from_secs(1)).await;
3547
3548        // The weak client must be the last reference to the client now.
3549        assert_eq!(weak_client.strong_count(), 0);
3550        let client = weak_client.get();
3551        assert!(
3552            client.is_none(),
3553            "too many strong references to the client: {}",
3554            Arc::strong_count(&client.unwrap().inner)
3555        );
3556    }
3557
3558    #[async_test]
3559    async fn test_server_info_caching() {
3560        let server = MatrixMockServer::new().await;
3561        let server_url = server.uri();
3562        let domain = server_url.strip_prefix("http://").unwrap();
3563        let server_name = <&ServerName>::try_from(domain).unwrap();
3564        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3565
3566        let well_known_mock = server
3567            .mock_well_known()
3568            .ok()
3569            .named("well known mock")
3570            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3571            .mount_as_scoped()
3572            .await;
3573
3574        let versions_mock = server
3575            .mock_versions()
3576            .ok_with_unstable_features()
3577            .named("first versions mock")
3578            .expect(1)
3579            .mount_as_scoped()
3580            .await;
3581
3582        let memory_store = Arc::new(MemoryStore::new());
3583        let client = Client::builder()
3584            .insecure_server_name_no_tls(server_name)
3585            .store_config(
3586                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3587                    .state_store(memory_store.clone()),
3588            )
3589            .build()
3590            .await
3591            .unwrap();
3592
3593        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3594
3595        // These subsequent calls hit the in-memory cache.
3596        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3597        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3598
3599        drop(client);
3600
3601        let client = server
3602            .client_builder()
3603            .no_server_versions()
3604            .on_builder(|builder| {
3605                builder.store_config(
3606                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3607                        .state_store(memory_store.clone()),
3608                )
3609            })
3610            .build()
3611            .await;
3612
3613        // These calls to the new client hit the on-disk cache.
3614        assert!(
3615            client
3616                .unstable_features()
3617                .await
3618                .unwrap()
3619                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3620        );
3621        let supported = client.supported_versions().await.unwrap();
3622        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3623        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3624
3625        // Then this call hits the in-memory cache.
3626        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3627
3628        drop(versions_mock);
3629        drop(well_known_mock);
3630
3631        // Now, reset the cache, and observe the endpoints being called again once.
3632        client.reset_server_info().await.unwrap();
3633
3634        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3635
3636        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3637
3638        // Hits network again.
3639        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3640        // Hits in-memory cache again.
3641        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3642        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3643    }
3644
3645    #[async_test]
3646    async fn test_server_info_without_a_well_known() {
3647        let server = MatrixMockServer::new().await;
3648        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3649
3650        let versions_mock = server
3651            .mock_versions()
3652            .ok_with_unstable_features()
3653            .named("first versions mock")
3654            .expect(1)
3655            .mount_as_scoped()
3656            .await;
3657
3658        let memory_store = Arc::new(MemoryStore::new());
3659        let client = server
3660            .client_builder()
3661            .no_server_versions()
3662            .on_builder(|builder| {
3663                builder.store_config(
3664                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3665                        .state_store(memory_store.clone()),
3666                )
3667            })
3668            .build()
3669            .await;
3670
3671        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3672
3673        // These subsequent calls hit the in-memory cache.
3674        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3675        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3676
3677        drop(client);
3678
3679        let client = server
3680            .client_builder()
3681            .no_server_versions()
3682            .on_builder(|builder| {
3683                builder.store_config(
3684                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3685                        .state_store(memory_store.clone()),
3686                )
3687            })
3688            .build()
3689            .await;
3690
3691        // This call to the new client hits the on-disk cache.
3692        assert!(
3693            client
3694                .unstable_features()
3695                .await
3696                .unwrap()
3697                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3698        );
3699
3700        // Then this call hits the in-memory cache.
3701        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3702
3703        drop(versions_mock);
3704
3705        // Now, reset the cache, and observe the endpoints being called again once.
3706        client.reset_server_info().await.unwrap();
3707
3708        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3709
3710        // Hits network again.
3711        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3712        // Hits in-memory cache again.
3713        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3714        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3715    }
3716
3717    #[async_test]
3718    async fn test_no_network_doesnt_cause_infinite_retries() {
3719        // We want infinite retries for transient errors.
3720        let client = MockClientBuilder::new(None)
3721            .on_builder(|builder| builder.request_config(RequestConfig::new()))
3722            .build()
3723            .await;
3724
3725        // We don't define a mock server on purpose here, so that the error is really a
3726        // network error.
3727        client.whoami().await.unwrap_err();
3728    }
3729
3730    #[async_test]
3731    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3732        let server = MatrixMockServer::new().await;
3733        let client = server.client_builder().build().await;
3734
3735        let room_id = room_id!("!room:example.org");
3736
3737        server
3738            .mock_sync()
3739            .ok_and_run(&client, |builder| {
3740                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3741            })
3742            .await;
3743
3744        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3745        assert_eq!(room.room_id(), room_id);
3746    }
3747
3748    #[async_test]
3749    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3750        let server = MatrixMockServer::new().await;
3751        let client = server.client_builder().build().await;
3752
3753        let room_id = room_id!("!room:example.org");
3754
3755        let client = Arc::new(client);
3756
3757        // Perform the /sync request with a delay so it starts after the
3758        // `await_room_remote_echo` call has happened
3759        spawn({
3760            let client = client.clone();
3761            async move {
3762                sleep(Duration::from_millis(100)).await;
3763
3764                server
3765                    .mock_sync()
3766                    .ok_and_run(&client, |builder| {
3767                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3768                    })
3769                    .await;
3770            }
3771        });
3772
3773        let room =
3774            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3775        assert_eq!(room.room_id(), room_id);
3776    }
3777
3778    #[async_test]
3779    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3780        let client = MockClientBuilder::new(None).build().await;
3781
3782        let room_id = room_id!("!room:example.org");
3783        // Room is not present so the client won't be able to find it. The call will
3784        // timeout.
3785        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3786    }
3787
3788    #[async_test]
3789    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3790        let server = MatrixMockServer::new().await;
3791        let client = server.client_builder().build().await;
3792
3793        server.mock_create_room().ok().mount().await;
3794
3795        // Create a room in the internal store
3796        let room = client
3797            .create_room(assign!(CreateRoomRequest::new(), {
3798                invite: vec![],
3799                is_direct: false,
3800            }))
3801            .await
3802            .unwrap();
3803
3804        // Room is locally present, but not synced, the call will timeout
3805        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3806            .await
3807            .unwrap_err();
3808    }
3809
3810    #[async_test]
3811    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3812        let server = MatrixMockServer::new().await;
3813        let client = server.client_builder().build().await;
3814
3815        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3816
3817        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3818        assert_matches!(ret, Ok(true));
3819    }
3820
3821    #[async_test]
3822    async fn test_is_room_alias_available_if_alias_is_resolved() {
3823        let server = MatrixMockServer::new().await;
3824        let client = server.client_builder().build().await;
3825
3826        server
3827            .mock_room_directory_resolve_alias()
3828            .ok("!some_room_id:matrix.org", Vec::new())
3829            .expect(1)
3830            .mount()
3831            .await;
3832
3833        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3834        assert_matches!(ret, Ok(false));
3835    }
3836
3837    #[async_test]
3838    async fn test_is_room_alias_available_if_error_found() {
3839        let server = MatrixMockServer::new().await;
3840        let client = server.client_builder().build().await;
3841
3842        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3843
3844        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3845        assert_matches!(ret, Err(_));
3846    }
3847
3848    #[async_test]
3849    async fn test_create_room_alias() {
3850        let server = MatrixMockServer::new().await;
3851        let client = server.client_builder().build().await;
3852
3853        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3854
3855        let ret = client
3856            .create_room_alias(
3857                room_alias_id!("#some_alias:matrix.org"),
3858                room_id!("!some_room:matrix.org"),
3859            )
3860            .await;
3861        assert_matches!(ret, Ok(()));
3862    }
3863
3864    #[async_test]
3865    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3866        let server = MatrixMockServer::new().await;
3867        let client = server.client_builder().build().await;
3868
3869        let room_id = room_id!("!a-room:matrix.org");
3870
3871        // Make sure the summary endpoint is called once
3872        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3873
3874        // We create a locally cached invited room
3875        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3876
3877        // And we get a preview, the server endpoint was reached
3878        let preview = client
3879            .get_room_preview(room_id.into(), Vec::new())
3880            .await
3881            .expect("Room preview should be retrieved");
3882
3883        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3884    }
3885
3886    #[async_test]
3887    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3888        let server = MatrixMockServer::new().await;
3889        let client = server.client_builder().build().await;
3890
3891        let room_id = room_id!("!a-room:matrix.org");
3892
3893        // Make sure the summary endpoint is called once
3894        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3895
3896        // We create a locally cached left room
3897        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3898
3899        // And we get a preview, the server endpoint was reached
3900        let preview = client
3901            .get_room_preview(room_id.into(), Vec::new())
3902            .await
3903            .expect("Room preview should be retrieved");
3904
3905        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3906    }
3907
3908    #[async_test]
3909    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3910        let server = MatrixMockServer::new().await;
3911        let client = server.client_builder().build().await;
3912
3913        let room_id = room_id!("!a-room:matrix.org");
3914
3915        // Make sure the summary endpoint is called once
3916        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3917
3918        // We create a locally cached knocked room
3919        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3920
3921        // And we get a preview, the server endpoint was reached
3922        let preview = client
3923            .get_room_preview(room_id.into(), Vec::new())
3924            .await
3925            .expect("Room preview should be retrieved");
3926
3927        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3928    }
3929
3930    #[async_test]
3931    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3932        let server = MatrixMockServer::new().await;
3933        let client = server.client_builder().build().await;
3934
3935        let room_id = room_id!("!a-room:matrix.org");
3936
3937        // Make sure the summary endpoint is not called
3938        server.mock_room_summary().ok(room_id).never().mount().await;
3939
3940        // We create a locally cached joined room
3941        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3942
3943        // And we get a preview, no server endpoint was reached
3944        let preview = client
3945            .get_room_preview(room_id.into(), Vec::new())
3946            .await
3947            .expect("Room preview should be retrieved");
3948
3949        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3950    }
3951
3952    #[async_test]
3953    async fn test_media_preview_config() {
3954        let server = MatrixMockServer::new().await;
3955        let client = server.client_builder().build().await;
3956
3957        server
3958            .mock_sync()
3959            .ok_and_run(&client, |builder| {
3960                builder.add_custom_global_account_data(json!({
3961                    "content": {
3962                        "media_previews": "private",
3963                        "invite_avatars": "off"
3964                    },
3965                    "type": "m.media_preview_config"
3966                }));
3967            })
3968            .await;
3969
3970        let (initial_value, stream) =
3971            client.account().observe_media_preview_config().await.unwrap();
3972
3973        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3974        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3975        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3976        pin_mut!(stream);
3977        assert_pending!(stream);
3978
3979        server
3980            .mock_sync()
3981            .ok_and_run(&client, |builder| {
3982                builder.add_custom_global_account_data(json!({
3983                    "content": {
3984                        "media_previews": "off",
3985                        "invite_avatars": "on"
3986                    },
3987                    "type": "m.media_preview_config"
3988                }));
3989            })
3990            .await;
3991
3992        assert_next_matches!(
3993            stream,
3994            MediaPreviewConfigEventContent {
3995                media_previews: Some(MediaPreviews::Off),
3996                invite_avatars: Some(InviteAvatars::On),
3997                ..
3998            }
3999        );
4000        assert_pending!(stream);
4001    }
4002
4003    #[async_test]
4004    async fn test_unstable_media_preview_config() {
4005        let server = MatrixMockServer::new().await;
4006        let client = server.client_builder().build().await;
4007
4008        server
4009            .mock_sync()
4010            .ok_and_run(&client, |builder| {
4011                builder.add_custom_global_account_data(json!({
4012                    "content": {
4013                        "media_previews": "private",
4014                        "invite_avatars": "off"
4015                    },
4016                    "type": "io.element.msc4278.media_preview_config"
4017                }));
4018            })
4019            .await;
4020
4021        let (initial_value, stream) =
4022            client.account().observe_media_preview_config().await.unwrap();
4023
4024        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4025        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4026        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4027        pin_mut!(stream);
4028        assert_pending!(stream);
4029
4030        server
4031            .mock_sync()
4032            .ok_and_run(&client, |builder| {
4033                builder.add_custom_global_account_data(json!({
4034                    "content": {
4035                        "media_previews": "off",
4036                        "invite_avatars": "on"
4037                    },
4038                    "type": "io.element.msc4278.media_preview_config"
4039                }));
4040            })
4041            .await;
4042
4043        assert_next_matches!(
4044            stream,
4045            MediaPreviewConfigEventContent {
4046                media_previews: Some(MediaPreviews::Off),
4047                invite_avatars: Some(InviteAvatars::On),
4048                ..
4049            }
4050        );
4051        assert_pending!(stream);
4052    }
4053
4054    #[async_test]
4055    async fn test_media_preview_config_not_found() {
4056        let server = MatrixMockServer::new().await;
4057        let client = server.client_builder().build().await;
4058
4059        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4060
4061        assert!(initial_value.is_none());
4062    }
4063
4064    #[async_test]
4065    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4066        // The default Matrix version we use is 1.11 or higher, so authenticated media
4067        // is supported.
4068        let server = MatrixMockServer::new().await;
4069        let client = server.client_builder().build().await;
4070
4071        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4072
4073        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4074        client.load_or_fetch_max_upload_size().await.unwrap();
4075
4076        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4077    }
4078
4079    #[async_test]
4080    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4081        // The server must advertise support for the stable feature for authenticated
4082        // media support, so we mock the `GET /versions` response.
4083        let server = MatrixMockServer::new().await;
4084        let client = server.client_builder().no_server_versions().build().await;
4085
4086        server
4087            .mock_versions()
4088            .ok_custom(
4089                &["v1.7", "v1.8", "v1.9", "v1.10"],
4090                &[("org.matrix.msc3916.stable", true)].into(),
4091            )
4092            .named("versions")
4093            .expect(1)
4094            .mount()
4095            .await;
4096
4097        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4098
4099        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4100        client.load_or_fetch_max_upload_size().await.unwrap();
4101
4102        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4103    }
4104
4105    #[async_test]
4106    async fn test_load_or_fetch_max_upload_size_no_auth() {
4107        // The server must not support Matrix 1.11 or higher for unauthenticated
4108        // media requests, so we mock the `GET /versions` response.
4109        let server = MatrixMockServer::new().await;
4110        let client = server.client_builder().no_server_versions().build().await;
4111
4112        server
4113            .mock_versions()
4114            .ok_custom(&["v1.1"], &Default::default())
4115            .named("versions")
4116            .expect(1)
4117            .mount()
4118            .await;
4119
4120        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4121
4122        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4123        client.load_or_fetch_max_upload_size().await.unwrap();
4124
4125        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4126    }
4127
4128    #[async_test]
4129    async fn test_uploading_a_too_large_media_file() {
4130        let server = MatrixMockServer::new().await;
4131        let client = server.client_builder().build().await;
4132
4133        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4134        client.load_or_fetch_max_upload_size().await.unwrap();
4135        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4136
4137        let data = vec![1, 2];
4138        let upload_request =
4139            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4140        let request = SendRequest {
4141            client: client.clone(),
4142            request: upload_request,
4143            config: None,
4144            send_progress: SharedObservable::new(TransmissionProgress::default()),
4145        };
4146        let media_request = SendMediaUploadRequest::new(request);
4147
4148        let error = media_request.await.err();
4149        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4150        assert_eq!(max, uint!(1));
4151        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4152    }
4153
4154    #[async_test]
4155    async fn test_dont_ignore_timeout_on_first_sync() {
4156        let server = MatrixMockServer::new().await;
4157        let client = server.client_builder().build().await;
4158
4159        server
4160            .mock_sync()
4161            .timeout(Some(Duration::from_secs(30)))
4162            .ok(|_| {})
4163            .mock_once()
4164            .named("sync_with_timeout")
4165            .mount()
4166            .await;
4167
4168        // Call the endpoint once to check the timeout.
4169        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4170
4171        timeout(Duration::from_secs(1), async {
4172            stream.next().await.unwrap().unwrap();
4173        })
4174        .await
4175        .unwrap();
4176    }
4177
4178    #[async_test]
4179    async fn test_ignore_timeout_on_first_sync() {
4180        let server = MatrixMockServer::new().await;
4181        let client = server.client_builder().build().await;
4182
4183        server
4184            .mock_sync()
4185            .timeout(None)
4186            .ok(|_| {})
4187            .mock_once()
4188            .named("sync_no_timeout")
4189            .mount()
4190            .await;
4191        server
4192            .mock_sync()
4193            .timeout(Some(Duration::from_secs(30)))
4194            .ok(|_| {})
4195            .mock_once()
4196            .named("sync_with_timeout")
4197            .mount()
4198            .await;
4199
4200        // Call each version of the endpoint once to check the timeouts.
4201        let mut stream = Box::pin(
4202            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4203        );
4204
4205        timeout(Duration::from_secs(1), async {
4206            stream.next().await.unwrap().unwrap();
4207            stream.next().await.unwrap().unwrap();
4208        })
4209        .await
4210        .unwrap();
4211    }
4212
4213    #[async_test]
4214    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4215        let server = MatrixMockServer::new().await;
4216        let client = server.client_builder().build().await;
4217        // This is the user ID that is inside MemberAdditional.
4218        // Note the confusing username, so we can share
4219        // GlobalAccountDataTestEvent::Direct with the invited test.
4220        let user_id = user_id!("@invited:localhost");
4221
4222        // When we receive a sync response saying "invited" is invited to a DM
4223        let f = EventFactory::new();
4224        let response = SyncResponseBuilder::default()
4225            .add_joined_room(
4226                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
4227            )
4228            .add_global_account_data(
4229                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4230            )
4231            .build_sync_response();
4232        client.base_client().receive_sync_response(response).await.unwrap();
4233
4234        // Then get_dm_room finds this room
4235        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4236        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4237    }
4238
4239    #[async_test]
4240    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4241        let server = MatrixMockServer::new().await;
4242        let client = server.client_builder().build().await;
4243        // This is the user ID that is inside MemberInvite
4244        let user_id = user_id!("@invited:localhost");
4245
4246        // When we receive a sync response saying "invited" is invited to a DM
4247        let f = EventFactory::new();
4248        let response = SyncResponseBuilder::default()
4249            .add_joined_room(
4250                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
4251            )
4252            .add_global_account_data(
4253                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4254            )
4255            .build_sync_response();
4256        client.base_client().receive_sync_response(response).await.unwrap();
4257
4258        // Then get_dm_room finds this room
4259        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4260        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4261    }
4262
4263    #[async_test]
4264    async fn test_get_dm_room_still_finds_left_room() {
4265        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4266        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4267
4268        let server = MatrixMockServer::new().await;
4269        let client = server.client_builder().build().await;
4270        // This is the user ID that is inside MemberAdditional.
4271        // Note the confusing username, so we can share
4272        // GlobalAccountDataTestEvent::Direct with the invited test.
4273        let user_id = user_id!("@invited:localhost");
4274
4275        // When we receive a sync response saying "invited" is invited to a DM
4276        let f = EventFactory::new();
4277        let response = SyncResponseBuilder::default()
4278            .add_joined_room(
4279                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
4280            )
4281            .add_global_account_data(
4282                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4283            )
4284            .build_sync_response();
4285        client.base_client().receive_sync_response(response).await.unwrap();
4286
4287        // Then get_dm_room finds this room
4288        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4289        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4290    }
4291
4292    #[async_test]
4293    async fn test_device_exists() {
4294        let server = MatrixMockServer::new().await;
4295        let client = server.client_builder().build().await;
4296
4297        server.mock_get_device().ok().expect(1).mount().await;
4298
4299        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4300    }
4301
4302    #[async_test]
4303    async fn test_device_exists_404() {
4304        let server = MatrixMockServer::new().await;
4305        let client = server.client_builder().build().await;
4306
4307        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4308    }
4309
4310    #[async_test]
4311    async fn test_device_exists_500() {
4312        let server = MatrixMockServer::new().await;
4313        let client = server.client_builder().build().await;
4314
4315        server.mock_get_device().error500().expect(1).mount().await;
4316
4317        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4318    }
4319}