matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{btree_map, BTreeMap},
19    fmt::{self, Debug},
20    future::{ready, Future},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23};
24
25use caches::ClientCaches;
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::store::LockableCryptoStore;
32use matrix_sdk_base::{
33    event_cache::store::EventCacheStoreLock,
34    store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
35    sync::{Notification, RoomUpdates},
36    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
37    StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm,
38};
39use matrix_sdk_common::ttl_cache::TtlCache;
40#[cfg(feature = "e2e-encryption")]
41use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
42use ruma::{
43    api::{
44        client::{
45            account::whoami,
46            alias::{create_alias, delete_alias, get_alias},
47            device::{delete_devices, get_devices, update_device},
48            directory::{get_public_rooms, get_public_rooms_filtered},
49            discovery::{
50                discover_homeserver,
51                discover_homeserver::RtcFocusInfo,
52                get_capabilities::{self, Capabilities},
53                get_supported_versions,
54            },
55            error::ErrorKind,
56            filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
57            knock::knock_room,
58            membership::{join_room_by_id, join_room_by_id_or_alias},
59            room::create_room,
60            session::login::v3::DiscoveryInfo,
61            sync::sync_events,
62            uiaa,
63            user_directory::search_users,
64        },
65        error::FromHttpResponseError,
66        MatrixVersion, OutgoingRequest,
67    },
68    assign,
69    push::Ruleset,
70    time::Instant,
71    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
72    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
73};
74use serde::de::DeserializeOwned;
75use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
76use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
77use url::Url;
78
79use self::futures::SendRequest;
80use crate::{
81    authentication::{
82        matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
83        SaveSessionCallback,
84    },
85    config::RequestConfig,
86    deduplicating_handler::DeduplicatingHandler,
87    error::HttpResult,
88    event_cache::EventCache,
89    event_handler::{
90        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
91        EventHandlerStore, ObservableEventHandler, SyncEvent,
92    },
93    http_client::HttpClient,
94    media::MediaError,
95    notification_settings::NotificationSettings,
96    room_preview::RoomPreview,
97    send_queue::SendQueueData,
98    sliding_sync::Version as SlidingSyncVersion,
99    sync::{RoomUpdate, SyncResponse},
100    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
101    Room, SessionTokens, TransmissionProgress,
102};
103#[cfg(feature = "e2e-encryption")]
104use crate::{
105    encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
106    store_locks::CrossProcessStoreLock,
107};
108
109mod builder;
110pub(crate) mod caches;
111pub(crate) mod futures;
112
113pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
114
115#[cfg(not(target_family = "wasm"))]
116type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
117#[cfg(target_family = "wasm")]
118type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
119
120#[cfg(not(target_family = "wasm"))]
121type NotificationHandlerFn =
122    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
123#[cfg(target_family = "wasm")]
124type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
125
126/// Enum controlling if a loop running callbacks should continue or abort.
127///
128/// This is mainly used in the [`sync_with_callback`] method, the return value
129/// of the provided callback controls if the sync loop should be exited.
130///
131/// [`sync_with_callback`]: #method.sync_with_callback
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133pub enum LoopCtrl {
134    /// Continue running the loop.
135    Continue,
136    /// Break out of the loop.
137    Break,
138}
139
140/// Represents changes that can occur to a `Client`s `Session`.
141#[derive(Debug, Clone, PartialEq)]
142pub enum SessionChange {
143    /// The session's token is no longer valid.
144    UnknownToken {
145        /// Whether or not the session was soft logged out
146        soft_logout: bool,
147    },
148    /// The session's tokens have been refreshed.
149    TokensRefreshed,
150}
151
152/// An async/await enabled Matrix client.
153///
154/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
155#[derive(Clone)]
156pub struct Client {
157    pub(crate) inner: Arc<ClientInner>,
158}
159
160#[derive(Default)]
161pub(crate) struct ClientLocks {
162    /// Lock ensuring that only a single room may be marked as a DM at once.
163    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
164    /// explanation.
165    pub(crate) mark_as_dm_lock: Mutex<()>,
166
167    /// Lock ensuring that only a single secret store is getting opened at the
168    /// same time.
169    ///
170    /// This is important so we don't accidentally create multiple different new
171    /// default secret storage keys.
172    #[cfg(feature = "e2e-encryption")]
173    pub(crate) open_secret_store_lock: Mutex<()>,
174
175    /// Lock ensuring that we're only storing a single secret at a time.
176    ///
177    /// Take a look at the [`SecretStore::put_secret`] method for a more
178    /// detailed explanation.
179    ///
180    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
181    #[cfg(feature = "e2e-encryption")]
182    pub(crate) store_secret_lock: Mutex<()>,
183
184    /// Lock ensuring that only one method at a time might modify our backup.
185    #[cfg(feature = "e2e-encryption")]
186    pub(crate) backup_modify_lock: Mutex<()>,
187
188    /// Lock ensuring that we're going to attempt to upload backups for a single
189    /// requester.
190    #[cfg(feature = "e2e-encryption")]
191    pub(crate) backup_upload_lock: Mutex<()>,
192
193    /// Handler making sure we only have one group session sharing request in
194    /// flight per room.
195    #[cfg(feature = "e2e-encryption")]
196    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
197
198    /// Lock making sure we're only doing one key claim request at a time.
199    #[cfg(feature = "e2e-encryption")]
200    pub(crate) key_claim_lock: Mutex<()>,
201
202    /// Handler to ensure that only one members request is running at a time,
203    /// given a room.
204    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
205
206    /// Handler to ensure that only one encryption state request is running at a
207    /// time, given a room.
208    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
209
210    /// Deduplicating handler for sending read receipts. The string is an
211    /// internal implementation detail, see [`Self::send_single_receipt`].
212    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
213
214    #[cfg(feature = "e2e-encryption")]
215    pub(crate) cross_process_crypto_store_lock:
216        OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
217
218    /// Latest "generation" of data known by the crypto store.
219    ///
220    /// This is a counter that only increments, set in the database (and can
221    /// wrap). It's incremented whenever some process acquires a lock for the
222    /// first time. *This assumes the crypto store lock is being held, to
223    /// avoid data races on writing to this value in the store*.
224    ///
225    /// The current process will maintain this value in local memory and in the
226    /// DB over time. Observing a different value than the one read in
227    /// memory, when reading from the store indicates that somebody else has
228    /// written into the database under our feet.
229    ///
230    /// TODO: this should live in the `OlmMachine`, since it's information
231    /// related to the lock. As of today (2023-07-28), we blow up the entire
232    /// olm machine when there's a generation mismatch. So storing the
233    /// generation in the olm machine would make the client think there's
234    /// *always* a mismatch, and that's why we need to store the generation
235    /// outside the `OlmMachine`.
236    #[cfg(feature = "e2e-encryption")]
237    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
238}
239
240pub(crate) struct ClientInner {
241    /// All the data related to authentication and authorization.
242    pub(crate) auth_ctx: Arc<AuthCtx>,
243
244    /// The URL of the server.
245    ///
246    /// Not to be confused with the `Self::homeserver`. `server` is usually
247    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
248    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
249    /// homeserver (at the time of writing — 2024-08-28).
250    ///
251    /// This value is optional depending on how the `Client` has been built.
252    /// If it's been built from a homeserver URL directly, we don't know the
253    /// server. However, if the `Client` has been built from a server URL or
254    /// name, then the homeserver has been discovered, and we know both.
255    server: Option<Url>,
256
257    /// The URL of the homeserver to connect to.
258    ///
259    /// This is the URL for the client-server Matrix API.
260    homeserver: StdRwLock<Url>,
261
262    /// The sliding sync version.
263    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
264
265    /// The underlying HTTP client.
266    pub(crate) http_client: HttpClient,
267
268    /// User session data.
269    pub(super) base_client: BaseClient,
270
271    /// Collection of in-memory caches for the [`Client`].
272    pub(crate) caches: ClientCaches,
273
274    /// Collection of locks individual client methods might want to use, either
275    /// to ensure that only a single call to a method happens at once or to
276    /// deduplicate multiple calls to a method.
277    pub(crate) locks: ClientLocks,
278
279    /// The cross-process store locks holder name.
280    ///
281    /// The SDK provides cross-process store locks (see
282    /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
283    /// `holder_name` is the value used for all cross-process store locks
284    /// used by this `Client`.
285    ///
286    /// If multiple `Client`s are running in different processes, this
287    /// value MUST be different for each `Client`.
288    cross_process_store_locks_holder_name: String,
289
290    /// A mapping of the times at which the current user sent typing notices,
291    /// keyed by room.
292    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
293
294    /// Event handlers. See `add_event_handler`.
295    pub(crate) event_handlers: EventHandlerStore,
296
297    /// Notification handlers. See `register_notification_handler`.
298    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
299
300    /// The sender-side of channels used to receive room updates.
301    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
302
303    /// The sender-side of a channel used to observe all the room updates of a
304    /// sync response.
305    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
306
307    /// Whether the client should update its homeserver URL with the discovery
308    /// information present in the login response.
309    respect_login_well_known: bool,
310
311    /// An event that can be listened on to wait for a successful sync. The
312    /// event will only be fired if a sync loop is running. Can be used for
313    /// synchronization, e.g. if we send out a request to create a room, we can
314    /// wait for the sync to get the data to fetch a room object from the state
315    /// store.
316    pub(crate) sync_beat: event_listener::Event,
317
318    /// A central cache for events, inactive first.
319    ///
320    /// It becomes active when [`EventCache::subscribe`] is called.
321    pub(crate) event_cache: OnceCell<EventCache>,
322
323    /// End-to-end encryption related state.
324    #[cfg(feature = "e2e-encryption")]
325    pub(crate) e2ee: EncryptionData,
326
327    /// The verification state of our own device.
328    #[cfg(feature = "e2e-encryption")]
329    pub(crate) verification_state: SharedObservable<VerificationState>,
330
331    /// Whether to enable the experimental support for sending and receiving
332    /// encrypted room history on invite, per [MSC4268].
333    ///
334    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
335    #[cfg(feature = "e2e-encryption")]
336    pub(crate) enable_share_history_on_invite: bool,
337
338    /// Data related to the [`SendQueue`].
339    ///
340    /// [`SendQueue`]: crate::send_queue::SendQueue
341    pub(crate) send_queue_data: Arc<SendQueueData>,
342
343    /// The `max_upload_size` value of the homeserver, it contains the max
344    /// request size you can send.
345    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
346}
347
348impl ClientInner {
349    /// Create a new `ClientInner`.
350    ///
351    /// All the fields passed as parameters here are those that must be cloned
352    /// upon instantiation of a sub-client, e.g. a client specialized for
353    /// notifications.
354    #[allow(clippy::too_many_arguments)]
355    async fn new(
356        auth_ctx: Arc<AuthCtx>,
357        server: Option<Url>,
358        homeserver: Url,
359        sliding_sync_version: SlidingSyncVersion,
360        http_client: HttpClient,
361        base_client: BaseClient,
362        server_info: ClientServerInfo,
363        respect_login_well_known: bool,
364        event_cache: OnceCell<EventCache>,
365        send_queue: Arc<SendQueueData>,
366        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
367        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
368        cross_process_store_locks_holder_name: String,
369    ) -> Arc<Self> {
370        let caches = ClientCaches {
371            server_info: server_info.into(),
372            server_metadata: Mutex::new(TtlCache::new()),
373        };
374
375        let client = Self {
376            server,
377            homeserver: StdRwLock::new(homeserver),
378            auth_ctx,
379            sliding_sync_version: StdRwLock::new(sliding_sync_version),
380            http_client,
381            base_client,
382            caches,
383            locks: Default::default(),
384            cross_process_store_locks_holder_name,
385            typing_notice_times: Default::default(),
386            event_handlers: Default::default(),
387            notification_handlers: Default::default(),
388            room_update_channels: Default::default(),
389            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
390            // ballast for all observers to catch up.
391            room_updates_sender: broadcast::Sender::new(32),
392            respect_login_well_known,
393            sync_beat: event_listener::Event::new(),
394            event_cache,
395            send_queue_data: send_queue,
396            #[cfg(feature = "e2e-encryption")]
397            e2ee: EncryptionData::new(encryption_settings),
398            #[cfg(feature = "e2e-encryption")]
399            verification_state: SharedObservable::new(VerificationState::Unknown),
400            #[cfg(feature = "e2e-encryption")]
401            enable_share_history_on_invite,
402            server_max_upload_size: Mutex::new(OnceCell::new()),
403        };
404
405        #[allow(clippy::let_and_return)]
406        let client = Arc::new(client);
407
408        #[cfg(feature = "e2e-encryption")]
409        client.e2ee.initialize_room_key_tasks(&client);
410
411        let _ = client
412            .event_cache
413            .get_or_init(|| async {
414                EventCache::new(
415                    WeakClient::from_inner(&client),
416                    client.base_client.event_cache_store().clone(),
417                )
418            })
419            .await;
420
421        client
422    }
423}
424
425#[cfg(not(tarpaulin_include))]
426impl Debug for Client {
427    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
428        write!(fmt, "Client")
429    }
430}
431
432impl Client {
433    /// Create a new [`Client`] that will use the given homeserver.
434    ///
435    /// # Arguments
436    ///
437    /// * `homeserver_url` - The homeserver that the client should connect to.
438    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
439        Self::builder().homeserver_url(homeserver_url).build().await
440    }
441
442    /// Returns a subscriber that publishes an event every time the ignore user
443    /// list changes.
444    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
445        self.inner.base_client.subscribe_to_ignore_user_list_changes()
446    }
447
448    /// Create a new [`ClientBuilder`].
449    pub fn builder() -> ClientBuilder {
450        ClientBuilder::new()
451    }
452
453    pub(crate) fn base_client(&self) -> &BaseClient {
454        &self.inner.base_client
455    }
456
457    /// The underlying HTTP client.
458    pub fn http_client(&self) -> &reqwest::Client {
459        &self.inner.http_client.inner
460    }
461
462    pub(crate) fn locks(&self) -> &ClientLocks {
463        &self.inner.locks
464    }
465
466    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
467        &self.inner.auth_ctx
468    }
469
470    /// The cross-process store locks holder name.
471    ///
472    /// The SDK provides cross-process store locks (see
473    /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
474    /// `holder_name` is the value used for all cross-process store locks
475    /// used by this `Client`.
476    pub fn cross_process_store_locks_holder_name(&self) -> &str {
477        &self.inner.cross_process_store_locks_holder_name
478    }
479
480    /// Change the homeserver URL used by this client.
481    ///
482    /// # Arguments
483    ///
484    /// * `homeserver_url` - The new URL to use.
485    fn set_homeserver(&self, homeserver_url: Url) {
486        *self.inner.homeserver.write().unwrap() = homeserver_url;
487    }
488
489    /// Get the capabilities of the homeserver.
490    ///
491    /// This method should be used to check what features are supported by the
492    /// homeserver.
493    ///
494    /// # Examples
495    ///
496    /// ```no_run
497    /// # use matrix_sdk::Client;
498    /// # use url::Url;
499    /// # async {
500    /// # let homeserver = Url::parse("http://example.com")?;
501    /// let client = Client::new(homeserver).await?;
502    ///
503    /// let capabilities = client.get_capabilities().await?;
504    ///
505    /// if capabilities.change_password.enabled {
506    ///     // Change password
507    /// }
508    /// # anyhow::Ok(()) };
509    /// ```
510    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
511        let res = self.send(get_capabilities::v3::Request::new()).await?;
512        Ok(res.capabilities)
513    }
514
515    /// Get a copy of the default request config.
516    ///
517    /// The default request config is what's used when sending requests if no
518    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
519    /// function with such a parameter.
520    ///
521    /// If the default request config was not customized through
522    /// [`ClientBuilder`] when creating this `Client`, the returned value will
523    /// be equivalent to [`RequestConfig::default()`].
524    pub fn request_config(&self) -> RequestConfig {
525        self.inner.http_client.request_config
526    }
527
528    /// Check whether the client has been activated.
529    ///
530    /// A client is considered active when:
531    ///
532    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
533    ///    is logged in,
534    /// 2. Has loaded cached data from storage,
535    /// 3. If encryption is enabled, it also initialized or restored its
536    ///    `OlmMachine`.
537    pub fn is_active(&self) -> bool {
538        self.inner.base_client.is_active()
539    }
540
541    /// The server used by the client.
542    ///
543    /// See `Self::server` to learn more.
544    pub fn server(&self) -> Option<&Url> {
545        self.inner.server.as_ref()
546    }
547
548    /// The homeserver of the client.
549    pub fn homeserver(&self) -> Url {
550        self.inner.homeserver.read().unwrap().clone()
551    }
552
553    /// Get the sliding sync version.
554    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
555        self.inner.sliding_sync_version.read().unwrap().clone()
556    }
557
558    /// Override the sliding sync version.
559    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
560        let mut lock = self.inner.sliding_sync_version.write().unwrap();
561        *lock = version;
562    }
563
564    /// Get the Matrix user session meta information.
565    ///
566    /// If the client is currently logged in, this will return a
567    /// [`SessionMeta`] object which contains the user ID and device ID.
568    /// Otherwise it returns `None`.
569    pub fn session_meta(&self) -> Option<&SessionMeta> {
570        self.base_client().session_meta()
571    }
572
573    /// Returns a receiver that gets events for each room info update. To watch
574    /// for new events, use `receiver.resubscribe()`. Each event contains the
575    /// room and a boolean whether this event should trigger a room list update.
576    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
577        self.base_client().room_info_notable_update_receiver()
578    }
579
580    /// Performs a search for users.
581    /// The search is performed case-insensitively on user IDs and display names
582    ///
583    /// # Arguments
584    ///
585    /// * `search_term` - The search term for the search
586    /// * `limit` - The maximum number of results to return. Defaults to 10.
587    ///
588    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
589    pub async fn search_users(
590        &self,
591        search_term: &str,
592        limit: u64,
593    ) -> HttpResult<search_users::v3::Response> {
594        let mut request = search_users::v3::Request::new(search_term.to_owned());
595
596        if let Some(limit) = UInt::new(limit) {
597            request.limit = limit;
598        }
599
600        self.send(request).await
601    }
602
603    /// Get the user id of the current owner of the client.
604    pub fn user_id(&self) -> Option<&UserId> {
605        self.session_meta().map(|s| s.user_id.as_ref())
606    }
607
608    /// Get the device ID that identifies the current session.
609    pub fn device_id(&self) -> Option<&DeviceId> {
610        self.session_meta().map(|s| s.device_id.as_ref())
611    }
612
613    /// Get the current access token for this session.
614    ///
615    /// Will be `None` if the client has not been logged in.
616    pub fn access_token(&self) -> Option<String> {
617        self.auth_ctx().access_token()
618    }
619
620    /// Get the current tokens for this session.
621    ///
622    /// To be notified of changes in the session tokens, use
623    /// [`Client::subscribe_to_session_changes()`] or
624    /// [`Client::set_session_callbacks()`].
625    ///
626    /// Returns `None` if the client has not been logged in.
627    pub fn session_tokens(&self) -> Option<SessionTokens> {
628        self.auth_ctx().session_tokens()
629    }
630
631    /// Access the authentication API used to log in this client.
632    ///
633    /// Will be `None` if the client has not been logged in.
634    pub fn auth_api(&self) -> Option<AuthApi> {
635        match self.auth_ctx().auth_data.get()? {
636            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
637            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
638        }
639    }
640
641    /// Get the whole session info of this client.
642    ///
643    /// Will be `None` if the client has not been logged in.
644    ///
645    /// Can be used with [`Client::restore_session`] to restore a previously
646    /// logged-in session.
647    pub fn session(&self) -> Option<AuthSession> {
648        match self.auth_api()? {
649            AuthApi::Matrix(api) => api.session().map(Into::into),
650            AuthApi::OAuth(api) => api.full_session().map(Into::into),
651        }
652    }
653
654    /// Get a reference to the state store.
655    pub fn state_store(&self) -> &DynStateStore {
656        self.base_client().state_store()
657    }
658
659    /// Get a reference to the event cache store.
660    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
661        self.base_client().event_cache_store()
662    }
663
664    /// Access the native Matrix authentication API with this client.
665    pub fn matrix_auth(&self) -> MatrixAuth {
666        MatrixAuth::new(self.clone())
667    }
668
669    /// Get the account of the current owner of the client.
670    pub fn account(&self) -> Account {
671        Account::new(self.clone())
672    }
673
674    /// Get the encryption manager of the client.
675    #[cfg(feature = "e2e-encryption")]
676    pub fn encryption(&self) -> Encryption {
677        Encryption::new(self.clone())
678    }
679
680    /// Get the media manager of the client.
681    pub fn media(&self) -> Media {
682        Media::new(self.clone())
683    }
684
685    /// Get the pusher manager of the client.
686    pub fn pusher(&self) -> Pusher {
687        Pusher::new(self.clone())
688    }
689
690    /// Access the OAuth 2.0 API of the client.
691    pub fn oauth(&self) -> OAuth {
692        OAuth::new(self.clone())
693    }
694
695    /// Register a handler for a specific event type.
696    ///
697    /// The handler is a function or closure with one or more arguments. The
698    /// first argument is the event itself. All additional arguments are
699    /// "context" arguments: They have to implement [`EventHandlerContext`].
700    /// This trait is named that way because most of the types implementing it
701    /// give additional context about an event: The room it was in, its raw form
702    /// and other similar things. As two exceptions to this,
703    /// [`Client`] and [`EventHandlerHandle`] also implement the
704    /// `EventHandlerContext` trait so you don't have to clone your client
705    /// into the event handler manually and a handler can decide to remove
706    /// itself.
707    ///
708    /// Some context arguments are not universally applicable. A context
709    /// argument that isn't available for the given event type will result in
710    /// the event handler being skipped and an error being logged. The following
711    /// context argument types are only available for a subset of event types:
712    ///
713    /// * [`Room`] is only available for room-specific events, i.e. not for
714    ///   events like global account data events or presence events.
715    ///
716    /// You can provide custom context via
717    /// [`add_event_handler_context`](Client::add_event_handler_context) and
718    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
719    /// into the event handler.
720    ///
721    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
722    ///
723    /// # Examples
724    ///
725    /// ```no_run
726    /// use matrix_sdk::{
727    ///     deserialized_responses::EncryptionInfo,
728    ///     event_handler::Ctx,
729    ///     ruma::{
730    ///         events::{
731    ///             macros::EventContent,
732    ///             push_rules::PushRulesEvent,
733    ///             room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
734    ///         },
735    ///         push::Action,
736    ///         Int, MilliSecondsSinceUnixEpoch,
737    ///     },
738    ///     Client, Room,
739    /// };
740    /// use serde::{Deserialize, Serialize};
741    ///
742    /// # async fn example(client: Client) {
743    /// client.add_event_handler(
744    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
745    ///         // Common usage: Room event plus room and client.
746    ///     },
747    /// );
748    /// client.add_event_handler(
749    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
750    ///         async move {
751    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
752    ///             // unencrypted events and events that were decrypted by the SDK.
753    ///         }
754    ///     },
755    /// );
756    /// client.add_event_handler(
757    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
758    ///         async move {
759    ///             // A `Vec<Action>` parameter allows you to know which push actions
760    ///             // are applicable for an event. For example, an event with
761    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
762    ///             // in the timeline.
763    ///         }
764    ///     },
765    /// );
766    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
767    ///     // You can omit any or all arguments after the first.
768    /// });
769    ///
770    /// // Registering a temporary event handler:
771    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
772    ///     /* Event handler */
773    /// });
774    /// client.remove_event_handler(handle);
775    ///
776    /// // Registering custom event handler context:
777    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
778    /// struct MyContext {
779    ///     number: usize,
780    /// }
781    /// client.add_event_handler_context(MyContext { number: 5 });
782    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
783    ///     // Use the context
784    /// });
785    ///
786    /// // Custom events work exactly the same way, you just need to declare
787    /// // the content struct and use the EventContent derive macro on it.
788    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
789    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
790    /// struct TokenEventContent {
791    ///     token: String,
792    ///     #[serde(rename = "exp")]
793    ///     expires_at: MilliSecondsSinceUnixEpoch,
794    /// }
795    ///
796    /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
797    ///     todo!("Display the token");
798    /// });
799    ///
800    /// // Event handler closures can also capture local variables.
801    /// // Make sure they are cheap to clone though, because they will be cloned
802    /// // every time the closure is called.
803    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
804    ///
805    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
806    ///     println!("Calling the handler with identifier {data}");
807    /// });
808    /// # }
809    /// ```
810    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
811    where
812        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
813        H: EventHandler<Ev, Ctx>,
814    {
815        self.add_event_handler_impl(handler, None)
816    }
817
818    /// Register a handler for a specific room, and event type.
819    ///
820    /// This method works the same way as
821    /// [`add_event_handler`][Self::add_event_handler], except that the handler
822    /// will only be called for events in the room with the specified ID. See
823    /// that method for more details on event handler functions.
824    ///
825    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
826    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
827    /// your use case.
828    pub fn add_room_event_handler<Ev, Ctx, H>(
829        &self,
830        room_id: &RoomId,
831        handler: H,
832    ) -> EventHandlerHandle
833    where
834        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
835        H: EventHandler<Ev, Ctx>,
836    {
837        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
838    }
839
840    /// Observe a specific event type.
841    ///
842    /// `Ev` represents the kind of event that will be observed. `Ctx`
843    /// represents the context that will come with the event. It relies on the
844    /// same mechanism as [`Client::add_event_handler`]. The main difference is
845    /// that it returns an [`ObservableEventHandler`] and doesn't require a
846    /// user-defined closure. It is possible to subscribe to the
847    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
848    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
849    /// Ctx)`.
850    ///
851    /// Be careful that only the most recent value can be observed. Subscribers
852    /// are notified when a new value is sent, but there is no guarantee
853    /// that they will see all values.
854    ///
855    /// # Example
856    ///
857    /// Let's see a classical usage:
858    ///
859    /// ```
860    /// use futures_util::StreamExt as _;
861    /// use matrix_sdk::{
862    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
863    ///     Client, Room,
864    /// };
865    ///
866    /// # async fn example(client: Client) -> Option<()> {
867    /// let observer =
868    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
869    ///
870    /// let mut subscriber = observer.subscribe();
871    ///
872    /// let (event, (room, push_actions)) = subscriber.next().await?;
873    /// # Some(())
874    /// # }
875    /// ```
876    ///
877    /// Now let's see how to get several contexts that can be useful for you:
878    ///
879    /// ```
880    /// use matrix_sdk::{
881    ///     deserialized_responses::EncryptionInfo,
882    ///     ruma::{
883    ///         events::room::{
884    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
885    ///         },
886    ///         push::Action,
887    ///     },
888    ///     Client, Room,
889    /// };
890    ///
891    /// # async fn example(client: Client) {
892    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
893    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
894    ///
895    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
896    /// // to distinguish between unencrypted events and events that were decrypted
897    /// // by the SDK.
898    /// let _ = client
899    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
900    ///     );
901    ///
902    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
903    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
904    /// // should be highlighted in the timeline.
905    /// let _ =
906    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
907    ///
908    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
909    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
910    /// # }
911    /// ```
912    ///
913    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
914    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
915    where
916        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
917        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
918    {
919        self.observe_room_events_impl(None)
920    }
921
922    /// Observe a specific room, and event type.
923    ///
924    /// This method works the same way as [`Client::observe_events`], except
925    /// that the observability will only be applied for events in the room with
926    /// the specified ID. See that method for more details.
927    ///
928    /// Be careful that only the most recent value can be observed. Subscribers
929    /// are notified when a new value is sent, but there is no guarantee
930    /// that they will see all values.
931    pub fn observe_room_events<Ev, Ctx>(
932        &self,
933        room_id: &RoomId,
934    ) -> ObservableEventHandler<(Ev, Ctx)>
935    where
936        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
937        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
938    {
939        self.observe_room_events_impl(Some(room_id.to_owned()))
940    }
941
942    /// Shared implementation for `Client::observe_events` and
943    /// `Client::observe_room_events`.
944    fn observe_room_events_impl<Ev, Ctx>(
945        &self,
946        room_id: Option<OwnedRoomId>,
947    ) -> ObservableEventHandler<(Ev, Ctx)>
948    where
949        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
950        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
951    {
952        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
953        // new value.
954        let shared_observable = SharedObservable::new(None);
955
956        ObservableEventHandler::new(
957            shared_observable.clone(),
958            self.event_handler_drop_guard(self.add_event_handler_impl(
959                move |event: Ev, context: Ctx| {
960                    shared_observable.set(Some((event, context)));
961
962                    ready(())
963                },
964                room_id,
965            )),
966        )
967    }
968
969    /// Remove the event handler associated with the handle.
970    ///
971    /// Note that you **must not** call `remove_event_handler` from the
972    /// non-async part of an event handler, that is:
973    ///
974    /// ```ignore
975    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
976    ///     // ⚠ this will cause a deadlock ⚠
977    ///     client.remove_event_handler(handle);
978    ///
979    ///     async move {
980    ///         // removing the event handler here is fine
981    ///         client.remove_event_handler(handle);
982    ///     }
983    /// })
984    /// ```
985    ///
986    /// Note also that handlers that remove themselves will still execute with
987    /// events received in the same sync cycle.
988    ///
989    /// # Arguments
990    ///
991    /// `handle` - The [`EventHandlerHandle`] that is returned when
992    /// registering the event handler with [`Client::add_event_handler`].
993    ///
994    /// # Examples
995    ///
996    /// ```no_run
997    /// # use url::Url;
998    /// # use tokio::sync::mpsc;
999    /// #
1000    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1001    /// #
1002    /// use matrix_sdk::{
1003    ///     event_handler::EventHandlerHandle,
1004    ///     ruma::events::room::member::SyncRoomMemberEvent, Client,
1005    /// };
1006    /// #
1007    /// # futures_executor::block_on(async {
1008    /// # let client = matrix_sdk::Client::builder()
1009    /// #     .homeserver_url(homeserver)
1010    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1011    /// #     .build()
1012    /// #     .await
1013    /// #     .unwrap();
1014    ///
1015    /// client.add_event_handler(
1016    ///     |ev: SyncRoomMemberEvent,
1017    ///      client: Client,
1018    ///      handle: EventHandlerHandle| async move {
1019    ///         // Common usage: Check arriving Event is the expected one
1020    ///         println!("Expected RoomMemberEvent received!");
1021    ///         client.remove_event_handler(handle);
1022    ///     },
1023    /// );
1024    /// # });
1025    /// ```
1026    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1027        self.inner.event_handlers.remove(handle);
1028    }
1029
1030    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1031    /// the given handle.
1032    ///
1033    /// When the returned value is dropped, the event handler will be removed.
1034    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1035        EventHandlerDropGuard::new(handle, self.clone())
1036    }
1037
1038    /// Add an arbitrary value for use as event handler context.
1039    ///
1040    /// The value can be obtained in an event handler by adding an argument of
1041    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1042    ///
1043    /// If a value of the same type has been added before, it will be
1044    /// overwritten.
1045    ///
1046    /// # Examples
1047    ///
1048    /// ```no_run
1049    /// use matrix_sdk::{
1050    ///     event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1051    ///     Room,
1052    /// };
1053    /// # #[derive(Clone)]
1054    /// # struct SomeType;
1055    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1056    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1057    /// # futures_executor::block_on(async {
1058    /// # let client = matrix_sdk::Client::builder()
1059    /// #     .homeserver_url(homeserver)
1060    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1061    /// #     .build()
1062    /// #     .await
1063    /// #     .unwrap();
1064    ///
1065    /// // Handle used to send messages to the UI part of the app
1066    /// let my_gui_handle: SomeType = obtain_gui_handle();
1067    ///
1068    /// client.add_event_handler_context(my_gui_handle.clone());
1069    /// client.add_event_handler(
1070    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1071    ///         async move {
1072    ///             // gui_handle.send(DisplayMessage { message: ev });
1073    ///         }
1074    ///     },
1075    /// );
1076    /// # });
1077    /// ```
1078    pub fn add_event_handler_context<T>(&self, ctx: T)
1079    where
1080        T: Clone + Send + Sync + 'static,
1081    {
1082        self.inner.event_handlers.add_context(ctx);
1083    }
1084
1085    /// Register a handler for a notification.
1086    ///
1087    /// Similar to [`Client::add_event_handler`], but only allows functions
1088    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1089    /// [`Client`] for now.
1090    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1091    where
1092        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1093        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1094    {
1095        self.inner.notification_handlers.write().await.push(Box::new(
1096            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1097        ));
1098
1099        self
1100    }
1101
1102    /// Subscribe to all updates for the room with the given ID.
1103    ///
1104    /// The returned receiver will receive a new message for each sync response
1105    /// that contains updates for that room.
1106    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1107        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1108            btree_map::Entry::Vacant(entry) => {
1109                let (tx, rx) = broadcast::channel(8);
1110                entry.insert(tx);
1111                rx
1112            }
1113            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1114        }
1115    }
1116
1117    /// Subscribe to all updates to all rooms, whenever any has been received in
1118    /// a sync response.
1119    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1120        self.inner.room_updates_sender.subscribe()
1121    }
1122
1123    pub(crate) async fn notification_handlers(
1124        &self,
1125    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1126        self.inner.notification_handlers.read().await
1127    }
1128
1129    /// Get all the rooms the client knows about.
1130    ///
1131    /// This will return the list of joined, invited, and left rooms.
1132    pub fn rooms(&self) -> Vec<Room> {
1133        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1134    }
1135
1136    /// Get all the rooms the client knows about, filtered by room state.
1137    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1138        self.base_client()
1139            .rooms_filtered(filter)
1140            .into_iter()
1141            .map(|room| Room::new(self.clone(), room))
1142            .collect()
1143    }
1144
1145    /// Get a stream of all the rooms, in addition to the existing rooms.
1146    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1147        let (rooms, stream) = self.base_client().rooms_stream();
1148
1149        let map_room = |room| Room::new(self.clone(), room);
1150
1151        (
1152            rooms.into_iter().map(map_room).collect(),
1153            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1154        )
1155    }
1156
1157    /// Returns the joined rooms this client knows about.
1158    pub fn joined_rooms(&self) -> Vec<Room> {
1159        self.base_client()
1160            .rooms_filtered(RoomStateFilter::JOINED)
1161            .into_iter()
1162            .map(|room| Room::new(self.clone(), room))
1163            .collect()
1164    }
1165
1166    /// Returns the invited rooms this client knows about.
1167    pub fn invited_rooms(&self) -> Vec<Room> {
1168        self.base_client()
1169            .rooms_filtered(RoomStateFilter::INVITED)
1170            .into_iter()
1171            .map(|room| Room::new(self.clone(), room))
1172            .collect()
1173    }
1174
1175    /// Returns the left rooms this client knows about.
1176    pub fn left_rooms(&self) -> Vec<Room> {
1177        self.base_client()
1178            .rooms_filtered(RoomStateFilter::LEFT)
1179            .into_iter()
1180            .map(|room| Room::new(self.clone(), room))
1181            .collect()
1182    }
1183
1184    /// Get a room with the given room id.
1185    ///
1186    /// # Arguments
1187    ///
1188    /// `room_id` - The unique id of the room that should be fetched.
1189    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1190        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1191    }
1192
1193    /// Gets the preview of a room, whether the current user has joined it or
1194    /// not.
1195    pub async fn get_room_preview(
1196        &self,
1197        room_or_alias_id: &RoomOrAliasId,
1198        via: Vec<OwnedServerName>,
1199    ) -> Result<RoomPreview> {
1200        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1201            Ok(room_id) => room_id.to_owned(),
1202            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1203        };
1204
1205        if let Some(room) = self.get_room(&room_id) {
1206            // The cached data can only be trusted if the room state is joined or
1207            // banned: for invite and knock rooms, no updates will be received
1208            // for the rooms after the invite/knock action took place so we may
1209            // have very out to date data for important fields such as
1210            // `join_rule`. For left rooms, the homeserver should return the latest info.
1211            match room.state() {
1212                RoomState::Joined | RoomState::Banned => {
1213                    return Ok(RoomPreview::from_known_room(&room).await);
1214                }
1215                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1216            }
1217        }
1218
1219        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1220    }
1221
1222    /// Resolve a room alias to a room id and a list of servers which know
1223    /// about it.
1224    ///
1225    /// # Arguments
1226    ///
1227    /// `room_alias` - The room alias to be resolved.
1228    pub async fn resolve_room_alias(
1229        &self,
1230        room_alias: &RoomAliasId,
1231    ) -> HttpResult<get_alias::v3::Response> {
1232        let request = get_alias::v3::Request::new(room_alias.to_owned());
1233        self.send(request).await
1234    }
1235
1236    /// Checks if a room alias is not in use yet.
1237    ///
1238    /// Returns:
1239    /// - `Ok(true)` if the room alias is available.
1240    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1241    ///   status code).
1242    /// - An `Err` otherwise.
1243    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1244        match self.resolve_room_alias(alias).await {
1245            // The room alias was resolved, so it's already in use.
1246            Ok(_) => Ok(false),
1247            Err(error) => {
1248                match error.client_api_error_kind() {
1249                    // The room alias wasn't found, so it's available.
1250                    Some(ErrorKind::NotFound) => Ok(true),
1251                    _ => Err(error),
1252                }
1253            }
1254        }
1255    }
1256
1257    /// Adds a new room alias associated with a room to the room directory.
1258    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1259        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1260        self.send(request).await?;
1261        Ok(())
1262    }
1263
1264    /// Removes a room alias from the room directory.
1265    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1266        let request = delete_alias::v3::Request::new(alias.to_owned());
1267        self.send(request).await?;
1268        Ok(())
1269    }
1270
1271    /// Update the homeserver from the login response well-known if needed.
1272    ///
1273    /// # Arguments
1274    ///
1275    /// * `login_well_known` - The `well_known` field from a successful login
1276    ///   response.
1277    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1278        if self.inner.respect_login_well_known {
1279            if let Some(well_known) = login_well_known {
1280                if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1281                    self.set_homeserver(homeserver);
1282                }
1283            }
1284        }
1285    }
1286
1287    /// Similar to [`Client::restore_session_with`], with
1288    /// [`RoomLoadSettings::default()`].
1289    ///
1290    /// # Panics
1291    ///
1292    /// Panics if a session was already restored or logged in.
1293    #[instrument(skip_all)]
1294    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1295        self.restore_session_with(session, RoomLoadSettings::default()).await
1296    }
1297
1298    /// Restore a session previously logged-in using one of the available
1299    /// authentication APIs. The number of rooms to restore is controlled by
1300    /// [`RoomLoadSettings`].
1301    ///
1302    /// See the documentation of the corresponding authentication API's
1303    /// `restore_session` method for more information.
1304    ///
1305    /// # Panics
1306    ///
1307    /// Panics if a session was already restored or logged in.
1308    #[instrument(skip_all)]
1309    pub async fn restore_session_with(
1310        &self,
1311        session: impl Into<AuthSession>,
1312        room_load_settings: RoomLoadSettings,
1313    ) -> Result<()> {
1314        let session = session.into();
1315        match session {
1316            AuthSession::Matrix(session) => {
1317                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1318            }
1319            AuthSession::OAuth(session) => {
1320                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1321            }
1322        }
1323    }
1324
1325    /// Refresh the access token using the authentication API used to log into
1326    /// this session.
1327    ///
1328    /// See the documentation of the authentication API's `refresh_access_token`
1329    /// method for more information.
1330    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1331        let Some(auth_api) = self.auth_api() else {
1332            return Err(RefreshTokenError::RefreshTokenRequired);
1333        };
1334
1335        match auth_api {
1336            AuthApi::Matrix(api) => {
1337                trace!("Token refresh: Using the homeserver.");
1338                Box::pin(api.refresh_access_token()).await?;
1339            }
1340            AuthApi::OAuth(api) => {
1341                trace!("Token refresh: Using OAuth 2.0.");
1342                Box::pin(api.refresh_access_token()).await?;
1343            }
1344        }
1345
1346        Ok(())
1347    }
1348
1349    /// Log out the current session using the proper authentication API.
1350    ///
1351    /// # Errors
1352    ///
1353    /// Returns an error if the session is not authenticated or if an error
1354    /// occurred while making the request to the server.
1355    pub async fn logout(&self) -> Result<(), Error> {
1356        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1357        match auth_api {
1358            AuthApi::Matrix(matrix_auth) => {
1359                matrix_auth.logout().await?;
1360                Ok(())
1361            }
1362            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1363        }
1364    }
1365
1366    /// Get or upload a sync filter.
1367    ///
1368    /// This method will either get a filter ID from the store or upload the
1369    /// filter definition to the homeserver and return the new filter ID.
1370    ///
1371    /// # Arguments
1372    ///
1373    /// * `filter_name` - The unique name of the filter, this name will be used
1374    /// locally to store and identify the filter ID returned by the server.
1375    ///
1376    /// * `definition` - The filter definition that should be uploaded to the
1377    /// server if no filter ID can be found in the store.
1378    ///
1379    /// # Examples
1380    ///
1381    /// ```no_run
1382    /// # use matrix_sdk::{
1383    /// #    Client, config::SyncSettings,
1384    /// #    ruma::api::client::{
1385    /// #        filter::{
1386    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1387    /// #        },
1388    /// #        sync::sync_events::v3::Filter,
1389    /// #    }
1390    /// # };
1391    /// # use url::Url;
1392    /// # async {
1393    /// # let homeserver = Url::parse("http://example.com").unwrap();
1394    /// # let client = Client::new(homeserver).await.unwrap();
1395    /// let mut filter = FilterDefinition::default();
1396    ///
1397    /// // Let's enable member lazy loading.
1398    /// filter.room.state.lazy_load_options =
1399    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1400    ///
1401    /// let filter_id = client
1402    ///     .get_or_upload_filter("sync", filter)
1403    ///     .await
1404    ///     .unwrap();
1405    ///
1406    /// let sync_settings = SyncSettings::new()
1407    ///     .filter(Filter::FilterId(filter_id));
1408    ///
1409    /// let response = client.sync_once(sync_settings).await.unwrap();
1410    /// # };
1411    #[instrument(skip(self, definition))]
1412    pub async fn get_or_upload_filter(
1413        &self,
1414        filter_name: &str,
1415        definition: FilterDefinition,
1416    ) -> Result<String> {
1417        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1418            debug!("Found filter locally");
1419            Ok(filter)
1420        } else {
1421            debug!("Didn't find filter locally");
1422            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1423            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1424            let response = self.send(request).await?;
1425
1426            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1427
1428            Ok(response.filter_id)
1429        }
1430    }
1431
1432    /// Finish joining a room.
1433    ///
1434    /// If the room was an invite that should be marked as a DM, will include it
1435    /// in the DM event after creating the joined room.
1436    async fn finish_join_room(&self, room_id: &RoomId) -> Result<Room> {
1437        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1438            room.state() == RoomState::Invited
1439                && room.is_direct().await.unwrap_or_else(|e| {
1440                    warn!(%room_id, "is_direct() failed: {e}");
1441                    false
1442                })
1443        } else {
1444            false
1445        };
1446
1447        let base_room = self.base_client().room_joined(room_id).await?;
1448        let room = Room::new(self.clone(), base_room);
1449
1450        if mark_as_dm {
1451            room.set_is_direct(true).await?;
1452        }
1453
1454        Ok(room)
1455    }
1456
1457    /// Join a room by `RoomId`.
1458    ///
1459    /// Returns the `Room` in the joined state.
1460    ///
1461    /// # Arguments
1462    ///
1463    /// * `room_id` - The `RoomId` of the room to be joined.
1464    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1465        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1466        let response = self.send(request).await?;
1467        self.finish_join_room(&response.room_id).await
1468    }
1469
1470    /// Join a room by `RoomOrAliasId`.
1471    ///
1472    /// Returns the `Room` in the joined state.
1473    ///
1474    /// # Arguments
1475    ///
1476    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1477    ///   alias looks like `#name:example.com`.
1478    /// * `server_names` - The server names to be used for resolving the alias,
1479    ///   if needs be.
1480    pub async fn join_room_by_id_or_alias(
1481        &self,
1482        alias: &RoomOrAliasId,
1483        server_names: &[OwnedServerName],
1484    ) -> Result<Room> {
1485        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1486            via: server_names.to_owned(),
1487        });
1488        let response = self.send(request).await?;
1489        self.finish_join_room(&response.room_id).await
1490    }
1491
1492    /// Search the homeserver's directory of public rooms.
1493    ///
1494    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1495    /// a `get_public_rooms::Response`.
1496    ///
1497    /// # Arguments
1498    ///
1499    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1500    ///
1501    /// * `since` - Pagination token from a previous request.
1502    ///
1503    /// * `server` - The name of the server, if `None` the requested server is
1504    ///   used.
1505    ///
1506    /// # Examples
1507    /// ```no_run
1508    /// use matrix_sdk::Client;
1509    /// # use url::Url;
1510    /// # let homeserver = Url::parse("http://example.com").unwrap();
1511    /// # let limit = Some(10);
1512    /// # let since = Some("since token");
1513    /// # let server = Some("servername.com".try_into().unwrap());
1514    /// # async {
1515    /// let mut client = Client::new(homeserver).await.unwrap();
1516    ///
1517    /// client.public_rooms(limit, since, server).await;
1518    /// # };
1519    /// ```
1520    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1521    pub async fn public_rooms(
1522        &self,
1523        limit: Option<u32>,
1524        since: Option<&str>,
1525        server: Option<&ServerName>,
1526    ) -> HttpResult<get_public_rooms::v3::Response> {
1527        let limit = limit.map(UInt::from);
1528
1529        let request = assign!(get_public_rooms::v3::Request::new(), {
1530            limit,
1531            since: since.map(ToOwned::to_owned),
1532            server: server.map(ToOwned::to_owned),
1533        });
1534        self.send(request).await
1535    }
1536
1537    /// Create a room with the given parameters.
1538    ///
1539    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1540    /// created room.
1541    ///
1542    /// If you want to create a direct message with one specific user, you can
1543    /// use [`create_dm`][Self::create_dm], which is more convenient than
1544    /// assembling the [`create_room::v3::Request`] yourself.
1545    ///
1546    /// If the `is_direct` field of the request is set to `true` and at least
1547    /// one user is invited, the room will be automatically added to the direct
1548    /// rooms in the account data.
1549    ///
1550    /// # Examples
1551    ///
1552    /// ```no_run
1553    /// use matrix_sdk::{
1554    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1555    ///     Client,
1556    /// };
1557    /// # use url::Url;
1558    /// #
1559    /// # async {
1560    /// # let homeserver = Url::parse("http://example.com").unwrap();
1561    /// let request = CreateRoomRequest::new();
1562    /// let client = Client::new(homeserver).await.unwrap();
1563    /// assert!(client.create_room(request).await.is_ok());
1564    /// # };
1565    /// ```
1566    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1567        let invite = request.invite.clone();
1568        let is_direct_room = request.is_direct;
1569        let response = self.send(request).await?;
1570
1571        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1572
1573        let joined_room = Room::new(self.clone(), base_room);
1574
1575        if is_direct_room && !invite.is_empty() {
1576            if let Err(error) =
1577                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1578            {
1579                // FIXME: Retry in the background
1580                error!("Failed to mark room as DM: {error}");
1581            }
1582        }
1583
1584        Ok(joined_room)
1585    }
1586
1587    /// Create a DM room.
1588    ///
1589    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1590    /// given user being invited, the room marked `is_direct` and both the
1591    /// creator and invitee getting the default maximum power level.
1592    ///
1593    /// If the `e2e-encryption` feature is enabled, the room will also be
1594    /// encrypted.
1595    ///
1596    /// # Arguments
1597    ///
1598    /// * `user_id` - The ID of the user to create a DM for.
1599    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1600        #[cfg(feature = "e2e-encryption")]
1601        let initial_state =
1602            vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1603                .to_raw_any()];
1604
1605        #[cfg(not(feature = "e2e-encryption"))]
1606        let initial_state = vec![];
1607
1608        let request = assign!(create_room::v3::Request::new(), {
1609            invite: vec![user_id.to_owned()],
1610            is_direct: true,
1611            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1612            initial_state,
1613        });
1614
1615        self.create_room(request).await
1616    }
1617
1618    /// Search the homeserver's directory for public rooms with a filter.
1619    ///
1620    /// # Arguments
1621    ///
1622    /// * `room_search` - The easiest way to create this request is using the
1623    ///   `get_public_rooms_filtered::Request` itself.
1624    ///
1625    /// # Examples
1626    ///
1627    /// ```no_run
1628    /// # use url::Url;
1629    /// # use matrix_sdk::Client;
1630    /// # async {
1631    /// # let homeserver = Url::parse("http://example.com")?;
1632    /// use matrix_sdk::ruma::{
1633    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1634    /// };
1635    /// # let mut client = Client::new(homeserver).await?;
1636    ///
1637    /// let mut filter = Filter::new();
1638    /// filter.generic_search_term = Some("rust".to_owned());
1639    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1640    /// request.filter = filter;
1641    ///
1642    /// let response = client.public_rooms_filtered(request).await?;
1643    ///
1644    /// for room in response.chunk {
1645    ///     println!("Found room {room:?}");
1646    /// }
1647    /// # anyhow::Ok(()) };
1648    /// ```
1649    pub async fn public_rooms_filtered(
1650        &self,
1651        request: get_public_rooms_filtered::v3::Request,
1652    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1653        self.send(request).await
1654    }
1655
1656    /// Send an arbitrary request to the server, without updating client state.
1657    ///
1658    /// **Warning:** Because this method *does not* update the client state, it
1659    /// is important to make sure that you account for this yourself, and
1660    /// use wrapper methods where available.  This method should *only* be
1661    /// used if a wrapper method for the endpoint you'd like to use is not
1662    /// available.
1663    ///
1664    /// # Arguments
1665    ///
1666    /// * `request` - A filled out and valid request for the endpoint to be hit
1667    ///
1668    /// * `timeout` - An optional request timeout setting, this overrides the
1669    ///   default request setting if one was set.
1670    ///
1671    /// # Examples
1672    ///
1673    /// ```no_run
1674    /// # use matrix_sdk::{Client, config::SyncSettings};
1675    /// # use url::Url;
1676    /// # async {
1677    /// # let homeserver = Url::parse("http://localhost:8080")?;
1678    /// # let mut client = Client::new(homeserver).await?;
1679    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1680    ///
1681    /// // First construct the request you want to make
1682    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1683    /// // for all available Endpoints
1684    /// let user_id = user_id!("@example:localhost").to_owned();
1685    /// let request = profile::get_profile::v3::Request::new(user_id);
1686    ///
1687    /// // Start the request using Client::send()
1688    /// let response = client.send(request).await?;
1689    ///
1690    /// // Check the corresponding Response struct to find out what types are
1691    /// // returned
1692    /// # anyhow::Ok(()) };
1693    /// ```
1694    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1695    where
1696        Request: OutgoingRequest + Clone + Debug,
1697        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1698    {
1699        SendRequest {
1700            client: self.clone(),
1701            request,
1702            config: None,
1703            send_progress: Default::default(),
1704        }
1705    }
1706
1707    pub(crate) async fn send_inner<Request>(
1708        &self,
1709        request: Request,
1710        config: Option<RequestConfig>,
1711        send_progress: SharedObservable<TransmissionProgress>,
1712    ) -> HttpResult<Request::IncomingResponse>
1713    where
1714        Request: OutgoingRequest + Debug,
1715        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1716    {
1717        let homeserver = self.homeserver().to_string();
1718        let access_token = self.access_token();
1719
1720        self.inner
1721            .http_client
1722            .send(
1723                request,
1724                config,
1725                homeserver,
1726                access_token.as_deref(),
1727                &self.server_versions().await?,
1728                send_progress,
1729            )
1730            .await
1731    }
1732
1733    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1734        _ = self
1735            .inner
1736            .auth_ctx
1737            .session_change_sender
1738            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1739    }
1740
1741    /// Fetches server versions from network; no caching.
1742    pub async fn fetch_server_versions(
1743        &self,
1744        request_config: Option<RequestConfig>,
1745    ) -> HttpResult<get_supported_versions::Response> {
1746        let server_versions = self
1747            .inner
1748            .http_client
1749            .send(
1750                get_supported_versions::Request::new(),
1751                request_config,
1752                self.homeserver().to_string(),
1753                None,
1754                &[MatrixVersion::V1_0],
1755                Default::default(),
1756            )
1757            .await?;
1758
1759        Ok(server_versions)
1760    }
1761
1762    /// Fetches client well_known from network; no caching.
1763    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1764        let server_url_string = self
1765            .server()
1766            .unwrap_or(
1767                // Sometimes people configure their well-known directly on the homeserver so use
1768                // this as a fallback when the server name is unknown.
1769                &self.homeserver(),
1770            )
1771            .to_string();
1772
1773        let well_known = self
1774            .inner
1775            .http_client
1776            .send(
1777                discover_homeserver::Request::new(),
1778                Some(RequestConfig::short_retry()),
1779                server_url_string,
1780                None,
1781                &[MatrixVersion::V1_0],
1782                Default::default(),
1783            )
1784            .await;
1785
1786        match well_known {
1787            Ok(well_known) => Some(well_known),
1788            Err(http_error) => {
1789                // It is perfectly valid to not have a well-known file.
1790                // Maybe we should check for a specific error code to be sure?
1791                warn!("Failed to fetch client well-known: {http_error}");
1792                None
1793            }
1794        }
1795    }
1796
1797    /// Load server info from storage, or fetch them from network and cache
1798    /// them.
1799    async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1800        match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1801            Ok(Some(stored)) => {
1802                if let Some(server_info) =
1803                    stored.into_server_info().and_then(|info| info.maybe_decode())
1804                {
1805                    return Ok(server_info);
1806                }
1807            }
1808            Ok(None) => {
1809                // fallthrough: cache is empty
1810            }
1811            Err(err) => {
1812                warn!("error when loading cached server info: {err}");
1813                // fallthrough to network.
1814            }
1815        }
1816
1817        let server_versions = self.fetch_server_versions(None).await?;
1818        let well_known = self.fetch_client_well_known().await;
1819        let server_info = ServerInfo::new(
1820            server_versions.versions.clone(),
1821            server_versions.unstable_features.clone(),
1822            well_known.map(Into::into),
1823        );
1824
1825        // Attempt to cache the result in storage.
1826        {
1827            if let Err(err) = self
1828                .state_store()
1829                .set_kv_data(
1830                    StateStoreDataKey::ServerInfo,
1831                    StateStoreDataValue::ServerInfo(server_info.clone()),
1832                )
1833                .await
1834            {
1835                warn!("error when caching server info: {err}");
1836            }
1837        }
1838
1839        Ok(server_info)
1840    }
1841
1842    async fn get_or_load_and_cache_server_info<
1843        Value,
1844        MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
1845    >(
1846        &self,
1847        map: MapFunction,
1848    ) -> HttpResult<Value> {
1849        let server_info = &self.inner.caches.server_info;
1850        if let CachedValue::Cached(val) = map(&*server_info.read().await) {
1851            return Ok(val);
1852        }
1853
1854        let mut guarded_server_info = server_info.write().await;
1855        if let CachedValue::Cached(val) = map(&guarded_server_info) {
1856            return Ok(val);
1857        }
1858
1859        let server_info = self.load_or_fetch_server_info().await?;
1860
1861        // Fill both unstable features and server versions at once.
1862        let mut versions = server_info.known_versions();
1863        if versions.is_empty() {
1864            versions.push(MatrixVersion::V1_0);
1865        }
1866
1867        guarded_server_info.server_versions = CachedValue::Cached(versions.into());
1868        guarded_server_info.unstable_features = CachedValue::Cached(server_info.unstable_features);
1869        guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
1870
1871        // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
1872        // fetch an optional property), the function will always return some.
1873        Ok(map(&guarded_server_info).unwrap_cached_value())
1874    }
1875
1876    /// Get the Matrix versions supported by the homeserver by fetching them
1877    /// from the server or the cache.
1878    ///
1879    /// # Examples
1880    ///
1881    /// ```no_run
1882    /// use ruma::api::MatrixVersion;
1883    /// # use matrix_sdk::{Client, config::SyncSettings};
1884    /// # use url::Url;
1885    /// # async {
1886    /// # let homeserver = Url::parse("http://localhost:8080")?;
1887    /// # let mut client = Client::new(homeserver).await?;
1888    ///
1889    /// let server_versions = client.server_versions().await?;
1890    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
1891    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
1892    /// # anyhow::Ok(()) };
1893    /// ```
1894    pub async fn server_versions(&self) -> HttpResult<Box<[MatrixVersion]>> {
1895        self.get_or_load_and_cache_server_info(|server_info| server_info.server_versions.clone())
1896            .await
1897    }
1898
1899    /// Get the unstable features supported by the homeserver by fetching them
1900    /// from the server or the cache.
1901    ///
1902    /// # Examples
1903    ///
1904    /// ```no_run
1905    /// # use matrix_sdk::{Client, config::SyncSettings};
1906    /// # use url::Url;
1907    /// # async {
1908    /// # let homeserver = Url::parse("http://localhost:8080")?;
1909    /// # let mut client = Client::new(homeserver).await?;
1910    /// let unstable_features = client.unstable_features().await?;
1911    /// let supports_msc_x =
1912    ///     unstable_features.get("msc_x").copied().unwrap_or(false);
1913    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
1914    /// # anyhow::Ok(()) };
1915    /// ```
1916    pub async fn unstable_features(&self) -> HttpResult<BTreeMap<String, bool>> {
1917        self.get_or_load_and_cache_server_info(|server_info| server_info.unstable_features.clone())
1918            .await
1919    }
1920
1921    /// Get information about the homeserver's advertised RTC foci by fetching
1922    /// the well-known file from the server or the cache.
1923    ///
1924    /// # Examples
1925    /// ```no_run
1926    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
1927    /// # use url::Url;
1928    /// # async {
1929    /// # let homeserver = Url::parse("http://localhost:8080")?;
1930    /// # let mut client = Client::new(homeserver).await?;
1931    /// let rtc_foci = client.rtc_foci().await?;
1932    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
1933    ///     RtcFocusInfo::LiveKit(info) => Some(info),
1934    ///     _ => None,
1935    /// });
1936    /// if let Some(info) = default_livekit_focus_info {
1937    ///     println!("Default LiveKit service URL: {}", info.service_url);
1938    /// }
1939    /// # anyhow::Ok(()) };
1940    /// ```
1941    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
1942        let well_known = self
1943            .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
1944            .await?;
1945
1946        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
1947    }
1948
1949    /// Empty the server version and unstable features cache.
1950    ///
1951    /// Since the SDK caches server info (versions, unstable features,
1952    /// well-known etc), it's possible to have a stale entry in the cache. This
1953    /// functions makes it possible to force reset it.
1954    pub async fn reset_server_info(&self) -> Result<()> {
1955        // Empty the in-memory caches.
1956        let mut guard = self.inner.caches.server_info.write().await;
1957        guard.server_versions = CachedValue::NotSet;
1958        guard.unstable_features = CachedValue::NotSet;
1959
1960        // Empty the store cache.
1961        Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
1962    }
1963
1964    /// Check whether MSC 4028 is enabled on the homeserver.
1965    ///
1966    /// # Examples
1967    ///
1968    /// ```no_run
1969    /// # use matrix_sdk::{Client, config::SyncSettings};
1970    /// # use url::Url;
1971    /// # async {
1972    /// # let homeserver = Url::parse("http://localhost:8080")?;
1973    /// # let mut client = Client::new(homeserver).await?;
1974    /// let msc4028_enabled =
1975    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
1976    /// # anyhow::Ok(()) };
1977    /// ```
1978    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
1979        Ok(self.unstable_features().await?.get("org.matrix.msc4028").copied().unwrap_or(false))
1980    }
1981
1982    /// Get information of all our own devices.
1983    ///
1984    /// # Examples
1985    ///
1986    /// ```no_run
1987    /// # use matrix_sdk::{Client, config::SyncSettings};
1988    /// # use url::Url;
1989    /// # async {
1990    /// # let homeserver = Url::parse("http://localhost:8080")?;
1991    /// # let mut client = Client::new(homeserver).await?;
1992    /// let response = client.devices().await?;
1993    ///
1994    /// for device in response.devices {
1995    ///     println!(
1996    ///         "Device: {} {}",
1997    ///         device.device_id,
1998    ///         device.display_name.as_deref().unwrap_or("")
1999    ///     );
2000    /// }
2001    /// # anyhow::Ok(()) };
2002    /// ```
2003    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2004        let request = get_devices::v3::Request::new();
2005
2006        self.send(request).await
2007    }
2008
2009    /// Delete the given devices from the server.
2010    ///
2011    /// # Arguments
2012    ///
2013    /// * `devices` - The list of devices that should be deleted from the
2014    ///   server.
2015    ///
2016    /// * `auth_data` - This request requires user interactive auth, the first
2017    ///   request needs to set this to `None` and will always fail with an
2018    ///   `UiaaResponse`. The response will contain information for the
2019    ///   interactive auth and the same request needs to be made but this time
2020    ///   with some `auth_data` provided.
2021    ///
2022    /// ```no_run
2023    /// # use matrix_sdk::{
2024    /// #    ruma::{api::client::uiaa, device_id},
2025    /// #    Client, Error, config::SyncSettings,
2026    /// # };
2027    /// # use serde_json::json;
2028    /// # use url::Url;
2029    /// # use std::collections::BTreeMap;
2030    /// # async {
2031    /// # let homeserver = Url::parse("http://localhost:8080")?;
2032    /// # let mut client = Client::new(homeserver).await?;
2033    /// let devices = &[device_id!("DEVICEID").to_owned()];
2034    ///
2035    /// if let Err(e) = client.delete_devices(devices, None).await {
2036    ///     if let Some(info) = e.as_uiaa_response() {
2037    ///         let mut password = uiaa::Password::new(
2038    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2039    ///             "wordpass".to_owned(),
2040    ///         );
2041    ///         password.session = info.session.clone();
2042    ///
2043    ///         client
2044    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2045    ///             .await?;
2046    ///     }
2047    /// }
2048    /// # anyhow::Ok(()) };
2049    pub async fn delete_devices(
2050        &self,
2051        devices: &[OwnedDeviceId],
2052        auth_data: Option<uiaa::AuthData>,
2053    ) -> HttpResult<delete_devices::v3::Response> {
2054        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2055        request.auth = auth_data;
2056
2057        self.send(request).await
2058    }
2059
2060    /// Change the display name of a device owned by the current user.
2061    ///
2062    /// Returns a `update_device::Response` which specifies the result
2063    /// of the operation.
2064    ///
2065    /// # Arguments
2066    ///
2067    /// * `device_id` - The ID of the device to change the display name of.
2068    /// * `display_name` - The new display name to set.
2069    pub async fn rename_device(
2070        &self,
2071        device_id: &DeviceId,
2072        display_name: &str,
2073    ) -> HttpResult<update_device::v3::Response> {
2074        let mut request = update_device::v3::Request::new(device_id.to_owned());
2075        request.display_name = Some(display_name.to_owned());
2076
2077        self.send(request).await
2078    }
2079
2080    /// Synchronize the client's state with the latest state on the server.
2081    ///
2082    /// ## Syncing Events
2083    ///
2084    /// Messages or any other type of event need to be periodically fetched from
2085    /// the server, this is achieved by sending a `/sync` request to the server.
2086    ///
2087    /// The first sync is sent out without a [`token`]. The response of the
2088    /// first sync will contain a [`next_batch`] field which should then be
2089    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2090    /// don't receive the same events multiple times.
2091    ///
2092    /// ## Long Polling
2093    ///
2094    /// A sync should in the usual case always be in flight. The
2095    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2096    /// long the server will wait for new events before it will respond.
2097    /// The server will respond immediately if some new events arrive before the
2098    /// timeout has expired. If no changes arrive and the timeout expires an
2099    /// empty sync response will be sent to the client.
2100    ///
2101    /// This method of sending a request that may not receive a response
2102    /// immediately is called long polling.
2103    ///
2104    /// ## Filtering Events
2105    ///
2106    /// The number or type of messages and events that the client should receive
2107    /// from the server can be altered using a [`Filter`].
2108    ///
2109    /// Filters can be non-trivial and, since they will be sent with every sync
2110    /// request, they may take up a bunch of unnecessary bandwidth.
2111    ///
2112    /// Luckily filters can be uploaded to the server and reused using an unique
2113    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2114    /// method.
2115    ///
2116    /// # Arguments
2117    ///
2118    /// * `sync_settings` - Settings for the sync call, this allows us to set
2119    /// various options to configure the sync:
2120    ///     * [`filter`] - To configure which events we receive and which get
2121    ///       [filtered] by the server
2122    ///     * [`timeout`] - To configure our [long polling] setup.
2123    ///     * [`token`] - To tell the server which events we already received
2124    ///       and where we wish to continue syncing.
2125    ///     * [`full_state`] - To tell the server that we wish to receive all
2126    ///       state events, regardless of our configured [`token`].
2127    ///     * [`set_presence`] - To tell the server to set the presence and to
2128    ///       which state.
2129    ///
2130    /// # Examples
2131    ///
2132    /// ```no_run
2133    /// # use url::Url;
2134    /// # async {
2135    /// # let homeserver = Url::parse("http://localhost:8080")?;
2136    /// # let username = "";
2137    /// # let password = "";
2138    /// use matrix_sdk::{
2139    ///     config::SyncSettings,
2140    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2141    /// };
2142    ///
2143    /// let client = Client::new(homeserver).await?;
2144    /// client.matrix_auth().login_username(username, password).send().await?;
2145    ///
2146    /// // Sync once so we receive the client state and old messages.
2147    /// client.sync_once(SyncSettings::default()).await?;
2148    ///
2149    /// // Register our handler so we start responding once we receive a new
2150    /// // event.
2151    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2152    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2153    /// });
2154    ///
2155    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2156    /// // from our `sync_once()` call automatically.
2157    /// client.sync(SyncSettings::default()).await;
2158    /// # anyhow::Ok(()) };
2159    /// ```
2160    ///
2161    /// [`sync`]: #method.sync
2162    /// [`SyncSettings`]: crate::config::SyncSettings
2163    /// [`token`]: crate::config::SyncSettings#method.token
2164    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2165    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2166    /// [`set_presence`]: ruma::presence::PresenceState
2167    /// [`filter`]: crate::config::SyncSettings#method.filter
2168    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2169    /// [`next_batch`]: SyncResponse#structfield.next_batch
2170    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2171    /// [long polling]: #long-polling
2172    /// [filtered]: #filtering-events
2173    #[instrument(skip(self))]
2174    pub async fn sync_once(
2175        &self,
2176        sync_settings: crate::config::SyncSettings,
2177    ) -> Result<SyncResponse> {
2178        // The sync might not return for quite a while due to the timeout.
2179        // We'll see if there's anything crypto related to send out before we
2180        // sync, i.e. if we closed our client after a sync but before the
2181        // crypto requests were sent out.
2182        //
2183        // This will mostly be a no-op.
2184        #[cfg(feature = "e2e-encryption")]
2185        if let Err(e) = self.send_outgoing_requests().await {
2186            error!(error = ?e, "Error while sending outgoing E2EE requests");
2187        }
2188
2189        let request = assign!(sync_events::v3::Request::new(), {
2190            filter: sync_settings.filter.map(|f| *f),
2191            since: sync_settings.token,
2192            full_state: sync_settings.full_state,
2193            set_presence: sync_settings.set_presence,
2194            timeout: sync_settings.timeout,
2195        });
2196        let mut request_config = self.request_config();
2197        if let Some(timeout) = sync_settings.timeout {
2198            request_config.timeout += timeout;
2199        }
2200
2201        let response = self.send(request).with_request_config(request_config).await?;
2202        let next_batch = response.next_batch.clone();
2203        let response = self.process_sync(response).await?;
2204
2205        #[cfg(feature = "e2e-encryption")]
2206        if let Err(e) = self.send_outgoing_requests().await {
2207            error!(error = ?e, "Error while sending outgoing E2EE requests");
2208        }
2209
2210        self.inner.sync_beat.notify(usize::MAX);
2211
2212        Ok(SyncResponse::new(next_batch, response))
2213    }
2214
2215    /// Repeatedly synchronize the client state with the server.
2216    ///
2217    /// This method will only return on error, if cancellation is needed
2218    /// the method should be wrapped in a cancelable task or the
2219    /// [`Client::sync_with_callback`] method can be used or
2220    /// [`Client::sync_with_result_callback`] if you want to handle error
2221    /// cases in the loop, too.
2222    ///
2223    /// This method will internally call [`Client::sync_once`] in a loop.
2224    ///
2225    /// This method can be used with the [`Client::add_event_handler`]
2226    /// method to react to individual events. If you instead wish to handle
2227    /// events in a bulk manner the [`Client::sync_with_callback`],
2228    /// [`Client::sync_with_result_callback`] and
2229    /// [`Client::sync_stream`] methods can be used instead. Those methods
2230    /// repeatedly return the whole sync response.
2231    ///
2232    /// # Arguments
2233    ///
2234    /// * `sync_settings` - Settings for the sync call. *Note* that those
2235    ///   settings will be only used for the first sync call. See the argument
2236    ///   docs for [`Client::sync_once`] for more info.
2237    ///
2238    /// # Return
2239    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2240    /// up to the user of the API to check the error and decide whether the sync
2241    /// should continue or not.
2242    ///
2243    /// # Examples
2244    ///
2245    /// ```no_run
2246    /// # use url::Url;
2247    /// # async {
2248    /// # let homeserver = Url::parse("http://localhost:8080")?;
2249    /// # let username = "";
2250    /// # let password = "";
2251    /// use matrix_sdk::{
2252    ///     config::SyncSettings,
2253    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2254    /// };
2255    ///
2256    /// let client = Client::new(homeserver).await?;
2257    /// client.matrix_auth().login_username(&username, &password).send().await?;
2258    ///
2259    /// // Register our handler so we start responding once we receive a new
2260    /// // event.
2261    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2262    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2263    /// });
2264    ///
2265    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2266    /// // automatically.
2267    /// client.sync(SyncSettings::default()).await?;
2268    /// # anyhow::Ok(()) };
2269    /// ```
2270    ///
2271    /// [argument docs]: #method.sync_once
2272    /// [`sync_with_callback`]: #method.sync_with_callback
2273    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2274        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2275    }
2276
2277    /// Repeatedly call sync to synchronize the client state with the server.
2278    ///
2279    /// # Arguments
2280    ///
2281    /// * `sync_settings` - Settings for the sync call. *Note* that those
2282    ///   settings will be only used for the first sync call. See the argument
2283    ///   docs for [`Client::sync_once`] for more info.
2284    ///
2285    /// * `callback` - A callback that will be called every time a successful
2286    ///   response has been fetched from the server. The callback must return a
2287    ///   boolean which signalizes if the method should stop syncing. If the
2288    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2289    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2290    ///
2291    /// # Return
2292    /// The sync runs until an error occurs or the
2293    /// callback indicates that the Loop should stop. If the callback asked for
2294    /// a regular stop, the result will be `Ok(())` otherwise the
2295    /// `Err(Error)` is returned.
2296    ///
2297    /// # Examples
2298    ///
2299    /// The following example demonstrates how to sync forever while sending all
2300    /// the interesting events through a mpsc channel to another thread e.g. a
2301    /// UI thread.
2302    ///
2303    /// ```no_run
2304    /// # use std::time::Duration;
2305    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2306    /// # use url::Url;
2307    /// # async {
2308    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2309    /// # let mut client = Client::new(homeserver).await.unwrap();
2310    ///
2311    /// use tokio::sync::mpsc::channel;
2312    ///
2313    /// let (tx, rx) = channel(100);
2314    ///
2315    /// let sync_channel = &tx;
2316    /// let sync_settings = SyncSettings::new()
2317    ///     .timeout(Duration::from_secs(30));
2318    ///
2319    /// client
2320    ///     .sync_with_callback(sync_settings, |response| async move {
2321    ///         let channel = sync_channel;
2322    ///         for (room_id, room) in response.rooms.joined {
2323    ///             for event in room.timeline.events {
2324    ///                 channel.send(event).await.unwrap();
2325    ///             }
2326    ///         }
2327    ///
2328    ///         LoopCtrl::Continue
2329    ///     })
2330    ///     .await;
2331    /// };
2332    /// ```
2333    #[instrument(skip_all)]
2334    pub async fn sync_with_callback<C>(
2335        &self,
2336        sync_settings: crate::config::SyncSettings,
2337        callback: impl Fn(SyncResponse) -> C,
2338    ) -> Result<(), Error>
2339    where
2340        C: Future<Output = LoopCtrl>,
2341    {
2342        self.sync_with_result_callback(sync_settings, |result| async {
2343            Ok(callback(result?).await)
2344        })
2345        .await
2346    }
2347
2348    /// Repeatedly call sync to synchronize the client state with the server.
2349    ///
2350    /// # Arguments
2351    ///
2352    /// * `sync_settings` - Settings for the sync call. *Note* that those
2353    ///   settings will be only used for the first sync call. See the argument
2354    ///   docs for [`Client::sync_once`] for more info.
2355    ///
2356    /// * `callback` - A callback that will be called every time after a
2357    ///   response has been received, failure or not. The callback returns a
2358    ///   `Result<LoopCtrl, Error>`, too. When returning
2359    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2360    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2361    ///   function returns `Ok(())`. In case the callback can't handle the
2362    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2363    ///   which results in the sync ending and the `Err(Error)` being returned.
2364    ///
2365    /// # Return
2366    /// The sync runs until an error occurs that the callback can't handle or
2367    /// the callback indicates that the Loop should stop. If the callback
2368    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2369    /// `Err(Error)` is returned.
2370    ///
2371    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2372    /// this, and are handled first without sending the result to the
2373    /// callback. Only after they have exceeded is the `Result` handed to
2374    /// the callback.
2375    ///
2376    /// # Examples
2377    ///
2378    /// The following example demonstrates how to sync forever while sending all
2379    /// the interesting events through a mpsc channel to another thread e.g. a
2380    /// UI thread.
2381    ///
2382    /// ```no_run
2383    /// # use std::time::Duration;
2384    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2385    /// # use url::Url;
2386    /// # async {
2387    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2388    /// # let mut client = Client::new(homeserver).await.unwrap();
2389    /// #
2390    /// use tokio::sync::mpsc::channel;
2391    ///
2392    /// let (tx, rx) = channel(100);
2393    ///
2394    /// let sync_channel = &tx;
2395    /// let sync_settings = SyncSettings::new()
2396    ///     .timeout(Duration::from_secs(30));
2397    ///
2398    /// client
2399    ///     .sync_with_result_callback(sync_settings, |response| async move {
2400    ///         let channel = sync_channel;
2401    ///         let sync_response = response?;
2402    ///         for (room_id, room) in sync_response.rooms.joined {
2403    ///              for event in room.timeline.events {
2404    ///                  channel.send(event).await.unwrap();
2405    ///               }
2406    ///         }
2407    ///
2408    ///         Ok(LoopCtrl::Continue)
2409    ///     })
2410    ///     .await;
2411    /// };
2412    /// ```
2413    #[instrument(skip(self, callback))]
2414    pub async fn sync_with_result_callback<C>(
2415        &self,
2416        mut sync_settings: crate::config::SyncSettings,
2417        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2418    ) -> Result<(), Error>
2419    where
2420        C: Future<Output = Result<LoopCtrl, Error>>,
2421    {
2422        let mut last_sync_time: Option<Instant> = None;
2423
2424        if sync_settings.token.is_none() {
2425            sync_settings.token = self.sync_token().await;
2426        }
2427
2428        loop {
2429            trace!("Syncing");
2430            let result = self.sync_loop_helper(&mut sync_settings).await;
2431
2432            trace!("Running callback");
2433            if callback(result).await? == LoopCtrl::Break {
2434                trace!("Callback told us to stop");
2435                break;
2436            }
2437            trace!("Done running callback");
2438
2439            Client::delay_sync(&mut last_sync_time).await
2440        }
2441
2442        Ok(())
2443    }
2444
2445    //// Repeatedly synchronize the client state with the server.
2446    ///
2447    /// This method will internally call [`Client::sync_once`] in a loop and is
2448    /// equivalent to the [`Client::sync`] method but the responses are provided
2449    /// as an async stream.
2450    ///
2451    /// # Arguments
2452    ///
2453    /// * `sync_settings` - Settings for the sync call. *Note* that those
2454    ///   settings will be only used for the first sync call. See the argument
2455    ///   docs for [`Client::sync_once`] for more info.
2456    ///
2457    /// # Examples
2458    ///
2459    /// ```no_run
2460    /// # use url::Url;
2461    /// # async {
2462    /// # let homeserver = Url::parse("http://localhost:8080")?;
2463    /// # let username = "";
2464    /// # let password = "";
2465    /// use futures_util::StreamExt;
2466    /// use matrix_sdk::{config::SyncSettings, Client};
2467    ///
2468    /// let client = Client::new(homeserver).await?;
2469    /// client.matrix_auth().login_username(&username, &password).send().await?;
2470    ///
2471    /// let mut sync_stream =
2472    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2473    ///
2474    /// while let Some(Ok(response)) = sync_stream.next().await {
2475    ///     for room in response.rooms.joined.values() {
2476    ///         for e in &room.timeline.events {
2477    ///             if let Ok(event) = e.raw().deserialize() {
2478    ///                 println!("Received event {:?}", event);
2479    ///             }
2480    ///         }
2481    ///     }
2482    /// }
2483    ///
2484    /// # anyhow::Ok(()) };
2485    /// ```
2486    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2487    #[instrument(skip(self))]
2488    pub async fn sync_stream(
2489        &self,
2490        mut sync_settings: crate::config::SyncSettings,
2491    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2492        let mut last_sync_time: Option<Instant> = None;
2493
2494        if sync_settings.token.is_none() {
2495            sync_settings.token = self.sync_token().await;
2496        }
2497
2498        let parent_span = Span::current();
2499
2500        async_stream::stream! {
2501            loop {
2502                yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;
2503
2504                Client::delay_sync(&mut last_sync_time).await
2505            }
2506        }
2507    }
2508
2509    /// Get the current, if any, sync token of the client.
2510    /// This will be None if the client didn't sync at least once.
2511    pub(crate) async fn sync_token(&self) -> Option<String> {
2512        self.inner.base_client.sync_token().await
2513    }
2514
2515    /// Gets information about the owner of a given access token.
2516    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2517        let request = whoami::v3::Request::new();
2518        self.send(request).await
2519    }
2520
2521    /// Subscribes a new receiver to client SessionChange broadcasts.
2522    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2523        let broadcast = &self.auth_ctx().session_change_sender;
2524        broadcast.subscribe()
2525    }
2526
2527    /// Sets the save/restore session callbacks.
2528    ///
2529    /// This is another mechanism to get synchronous updates to session tokens,
2530    /// while [`Self::subscribe_to_session_changes`] provides an async update.
2531    pub fn set_session_callbacks(
2532        &self,
2533        reload_session_callback: Box<ReloadSessionCallback>,
2534        save_session_callback: Box<SaveSessionCallback>,
2535    ) -> Result<()> {
2536        self.inner
2537            .auth_ctx
2538            .reload_session_callback
2539            .set(reload_session_callback)
2540            .map_err(|_| Error::MultipleSessionCallbacks)?;
2541
2542        self.inner
2543            .auth_ctx
2544            .save_session_callback
2545            .set(save_session_callback)
2546            .map_err(|_| Error::MultipleSessionCallbacks)?;
2547
2548        Ok(())
2549    }
2550
2551    /// Get the notification settings of the current owner of the client.
2552    pub async fn notification_settings(&self) -> NotificationSettings {
2553        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2554        NotificationSettings::new(self.clone(), ruleset)
2555    }
2556
2557    /// Create a new specialized `Client` that can process notifications.
2558    ///
2559    /// See [`CrossProcessStoreLock::new`] to learn more about
2560    /// `cross_process_store_locks_holder_name`.
2561    ///
2562    /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2563    pub async fn notification_client(
2564        &self,
2565        cross_process_store_locks_holder_name: String,
2566    ) -> Result<Client> {
2567        let client = Client {
2568            inner: ClientInner::new(
2569                self.inner.auth_ctx.clone(),
2570                self.server().cloned(),
2571                self.homeserver(),
2572                self.sliding_sync_version(),
2573                self.inner.http_client.clone(),
2574                self.inner
2575                    .base_client
2576                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2577                    .await?,
2578                self.inner.caches.server_info.read().await.clone(),
2579                self.inner.respect_login_well_known,
2580                self.inner.event_cache.clone(),
2581                self.inner.send_queue_data.clone(),
2582                #[cfg(feature = "e2e-encryption")]
2583                self.inner.e2ee.encryption_settings,
2584                #[cfg(feature = "e2e-encryption")]
2585                self.inner.enable_share_history_on_invite,
2586                cross_process_store_locks_holder_name,
2587            )
2588            .await,
2589        };
2590
2591        Ok(client)
2592    }
2593
2594    /// The [`EventCache`] instance for this [`Client`].
2595    pub fn event_cache(&self) -> &EventCache {
2596        // SAFETY: always initialized in the `Client` ctor.
2597        self.inner.event_cache.get().unwrap()
2598    }
2599
2600    /// Waits until an at least partially synced room is received, and returns
2601    /// it.
2602    ///
2603    /// **Note: this function will loop endlessly until either it finds the room
2604    /// or an externally set timeout happens.**
2605    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2606        loop {
2607            if let Some(room) = self.get_room(room_id) {
2608                if room.is_state_partially_or_fully_synced() {
2609                    debug!("Found just created room!");
2610                    return room;
2611                }
2612                debug!("Room wasn't partially synced, waiting for sync beat to try again");
2613            } else {
2614                debug!("Room wasn't found, waiting for sync beat to try again");
2615            }
2616            self.inner.sync_beat.listen().await;
2617        }
2618    }
2619
2620    /// Knock on a room given its `room_id_or_alias` to ask for permission to
2621    /// join it.
2622    pub async fn knock(
2623        &self,
2624        room_id_or_alias: OwnedRoomOrAliasId,
2625        reason: Option<String>,
2626        server_names: Vec<OwnedServerName>,
2627    ) -> Result<Room> {
2628        let request =
2629            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2630        let response = self.send(request).await?;
2631        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2632        Ok(Room::new(self.clone(), base_room))
2633    }
2634
2635    /// Checks whether the provided `user_id` belongs to an ignored user.
2636    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2637        self.base_client().is_user_ignored(user_id).await
2638    }
2639
2640    /// Gets the `max_upload_size` value from the homeserver, getting either a
2641    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2642    /// missing.
2643    ///
2644    /// Check the spec for more info:
2645    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2646    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2647        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2648        if let Some(data) = max_upload_size_lock.get() {
2649            return Ok(data.to_owned());
2650        }
2651
2652        let response = self
2653            .send(ruma::api::client::authenticated_media::get_media_config::v1::Request::default())
2654            .await?;
2655
2656        match max_upload_size_lock.set(response.upload_size) {
2657            Ok(_) => Ok(response.upload_size),
2658            Err(error) => {
2659                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2660            }
2661        }
2662    }
2663}
2664
2665/// A weak reference to the inner client, useful when trying to get a handle
2666/// on the owning client.
2667#[derive(Clone)]
2668pub(crate) struct WeakClient {
2669    client: Weak<ClientInner>,
2670}
2671
2672impl WeakClient {
2673    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2674    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2675        Self { client: Arc::downgrade(client) }
2676    }
2677
2678    /// Construct a [`WeakClient`] from a [`Client`].
2679    pub fn from_client(client: &Client) -> Self {
2680        Self::from_inner(&client.inner)
2681    }
2682
2683    /// Attempts to get a [`Client`] from this [`WeakClient`].
2684    pub fn get(&self) -> Option<Client> {
2685        self.client.upgrade().map(|inner| Client { inner })
2686    }
2687
2688    /// Gets the number of strong (`Arc`) pointers still pointing to this
2689    /// client.
2690    #[allow(dead_code)]
2691    pub fn strong_count(&self) -> usize {
2692        self.client.strong_count()
2693    }
2694}
2695
2696#[derive(Clone)]
2697struct ClientServerInfo {
2698    /// The Matrix versions the server supports (known ones only).
2699    server_versions: CachedValue<Box<[MatrixVersion]>>,
2700
2701    /// The unstable features and their on/off state on the server.
2702    unstable_features: CachedValue<BTreeMap<String, bool>>,
2703
2704    /// The server's well-known file, if any.
2705    well_known: CachedValue<Option<WellKnownResponse>>,
2706}
2707
2708/// A cached value that can either be set or not set, used to avoid confusion
2709/// between a value that is set to `None` (because it doesn't exist) and a value
2710/// that has not been cached yet.
2711#[derive(Clone)]
2712enum CachedValue<Value> {
2713    /// A value has been cached.
2714    Cached(Value),
2715    /// Nothing has been cached yet.
2716    NotSet,
2717}
2718
2719impl<Value> CachedValue<Value> {
2720    /// Unwraps the cached value, returning it if it exists.
2721    ///
2722    /// # Panics
2723    ///
2724    /// If the cached value is not set, this will panic.
2725    fn unwrap_cached_value(self) -> Value {
2726        match self {
2727            CachedValue::Cached(value) => value,
2728            CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
2729        }
2730    }
2731}
2732
2733// The http mocking library is not supported for wasm32
2734#[cfg(all(test, not(target_family = "wasm")))]
2735pub(crate) mod tests {
2736    use std::{sync::Arc, time::Duration};
2737
2738    use assert_matches::assert_matches;
2739    use assert_matches2::assert_let;
2740    use eyeball::SharedObservable;
2741    use futures_util::{pin_mut, FutureExt};
2742    use js_int::{uint, UInt};
2743    use matrix_sdk_base::{
2744        store::{MemoryStore, StoreConfig},
2745        RoomState,
2746    };
2747    use matrix_sdk_test::{
2748        async_test, test_json, GlobalAccountDataTestEvent, JoinedRoomBuilder, StateTestEvent,
2749        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
2750    };
2751    #[cfg(target_family = "wasm")]
2752    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2753
2754    use ruma::{
2755        api::{
2756            client::{
2757                discovery::discover_homeserver::RtcFocusInfo,
2758                room::create_room::v3::Request as CreateRoomRequest,
2759            },
2760            MatrixVersion,
2761        },
2762        assign,
2763        events::{
2764            ignored_user_list::IgnoredUserListEventContent,
2765            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
2766        },
2767        owned_room_id, room_alias_id, room_id, RoomId, ServerName, UserId,
2768    };
2769    use serde_json::json;
2770    use stream_assert::{assert_next_matches, assert_pending};
2771    use tokio::{
2772        spawn,
2773        time::{sleep, timeout},
2774    };
2775    use url::Url;
2776    use wiremock::{
2777        matchers::{body_json, header, method, path, query_param_is_missing},
2778        Mock, MockServer, ResponseTemplate,
2779    };
2780
2781    use super::Client;
2782    use crate::{
2783        client::{futures::SendMediaUploadRequest, WeakClient},
2784        config::{RequestConfig, SyncSettings},
2785        futures::SendRequest,
2786        media::MediaError,
2787        test_utils::{
2788            logged_in_client, mocks::MatrixMockServer, no_retry_test_client, set_client_session,
2789            test_client_builder, test_client_builder_with_server,
2790        },
2791        Error, TransmissionProgress,
2792    };
2793
2794    #[async_test]
2795    async fn test_account_data() {
2796        let server = MockServer::start().await;
2797        let client = logged_in_client(Some(server.uri())).await;
2798
2799        Mock::given(method("GET"))
2800            .and(path("/_matrix/client/r0/sync".to_owned()))
2801            .and(header("authorization", "Bearer 1234"))
2802            .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::SYNC))
2803            .mount(&server)
2804            .await;
2805
2806        let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
2807        let _response = client.sync_once(sync_settings).await.unwrap();
2808
2809        let content = client
2810            .account()
2811            .account_data::<IgnoredUserListEventContent>()
2812            .await
2813            .unwrap()
2814            .unwrap()
2815            .deserialize()
2816            .unwrap();
2817
2818        assert_eq!(content.ignored_users.len(), 1);
2819    }
2820
2821    #[async_test]
2822    async fn test_successful_discovery() {
2823        // Imagine this is `matrix.org`.
2824        let server = MockServer::start().await;
2825
2826        // Imagine this is `matrix-client.matrix.org`.
2827        let homeserver = MockServer::start().await;
2828
2829        // Imagine Alice has the user ID `@alice:matrix.org`.
2830        let server_url = server.uri();
2831        let domain = server_url.strip_prefix("http://").unwrap();
2832        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2833
2834        // The `.well-known` is on the server (e.g. `matrix.org`).
2835        Mock::given(method("GET"))
2836            .and(path("/.well-known/matrix/client"))
2837            .respond_with(ResponseTemplate::new(200).set_body_raw(
2838                test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", &homeserver.uri()),
2839                "application/json",
2840            ))
2841            .mount(&server)
2842            .await;
2843
2844        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
2845        Mock::given(method("GET"))
2846            .and(path("/_matrix/client/versions"))
2847            .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
2848            .mount(&homeserver)
2849            .await;
2850
2851        let client = Client::builder()
2852            .insecure_server_name_no_tls(alice.server_name())
2853            .build()
2854            .await
2855            .unwrap();
2856
2857        assert_eq!(client.server().unwrap(), &Url::parse(&server.uri()).unwrap());
2858        assert_eq!(client.homeserver(), Url::parse(&homeserver.uri()).unwrap());
2859    }
2860
2861    #[async_test]
2862    async fn test_discovery_broken_server() {
2863        let server = MockServer::start().await;
2864        let server_url = server.uri();
2865        let domain = server_url.strip_prefix("http://").unwrap();
2866        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2867
2868        Mock::given(method("GET"))
2869            .and(path("/.well-known/matrix/client"))
2870            .respond_with(ResponseTemplate::new(404))
2871            .mount(&server)
2872            .await;
2873
2874        assert!(
2875            Client::builder()
2876                .insecure_server_name_no_tls(alice.server_name())
2877                .build()
2878                .await
2879                .is_err(),
2880            "Creating a client from a user ID should fail when the .well-known request fails."
2881        );
2882    }
2883
2884    #[async_test]
2885    async fn test_room_creation() {
2886        let server = MockServer::start().await;
2887        let client = logged_in_client(Some(server.uri())).await;
2888
2889        let response = SyncResponseBuilder::default()
2890            .add_joined_room(
2891                JoinedRoomBuilder::default()
2892                    .add_state_event(StateTestEvent::Member)
2893                    .add_state_event(StateTestEvent::PowerLevels),
2894            )
2895            .build_sync_response();
2896
2897        client.inner.base_client.receive_sync_response(response).await.unwrap();
2898
2899        assert_eq!(client.homeserver(), Url::parse(&server.uri()).unwrap());
2900
2901        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
2902        assert_eq!(room.state(), RoomState::Joined);
2903    }
2904
2905    #[async_test]
2906    async fn test_retry_limit_http_requests() {
2907        let server = MockServer::start().await;
2908        let client = test_client_builder(Some(server.uri()))
2909            .request_config(RequestConfig::new().retry_limit(3))
2910            .build()
2911            .await
2912            .unwrap();
2913
2914        assert!(client.request_config().retry_limit.unwrap() == 3);
2915
2916        Mock::given(method("POST"))
2917            .and(path("/_matrix/client/r0/login"))
2918            .respond_with(ResponseTemplate::new(501))
2919            .expect(3)
2920            .mount(&server)
2921            .await;
2922
2923        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2924    }
2925
2926    #[async_test]
2927    async fn test_retry_timeout_http_requests() {
2928        // Keep this timeout small so that the test doesn't take long
2929        let retry_timeout = Duration::from_secs(5);
2930        let server = MockServer::start().await;
2931        let client = test_client_builder(Some(server.uri()))
2932            .request_config(RequestConfig::new().max_retry_time(retry_timeout))
2933            .build()
2934            .await
2935            .unwrap();
2936
2937        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
2938
2939        Mock::given(method("POST"))
2940            .and(path("/_matrix/client/r0/login"))
2941            .respond_with(ResponseTemplate::new(501))
2942            .expect(2..)
2943            .mount(&server)
2944            .await;
2945
2946        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2947    }
2948
2949    #[async_test]
2950    async fn test_short_retry_initial_http_requests() {
2951        let server = MockServer::start().await;
2952        let client = test_client_builder(Some(server.uri())).build().await.unwrap();
2953
2954        Mock::given(method("POST"))
2955            .and(path("/_matrix/client/r0/login"))
2956            .respond_with(ResponseTemplate::new(501))
2957            .expect(3..)
2958            .mount(&server)
2959            .await;
2960
2961        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2962    }
2963
2964    #[async_test]
2965    async fn test_no_retry_http_requests() {
2966        let server = MockServer::start().await;
2967        let client = logged_in_client(Some(server.uri())).await;
2968
2969        Mock::given(method("GET"))
2970            .and(path("/_matrix/client/r0/devices"))
2971            .respond_with(ResponseTemplate::new(501))
2972            .expect(1)
2973            .mount(&server)
2974            .await;
2975
2976        client.devices().await.unwrap_err();
2977    }
2978
2979    #[async_test]
2980    async fn test_set_homeserver() {
2981        let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2982        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
2983
2984        let homeserver = Url::parse("http://example.com/").unwrap();
2985        client.set_homeserver(homeserver.clone());
2986        assert_eq!(client.homeserver(), homeserver);
2987    }
2988
2989    #[async_test]
2990    async fn test_search_user_request() {
2991        let server = MockServer::start().await;
2992        let client = logged_in_client(Some(server.uri())).await;
2993
2994        Mock::given(method("POST"))
2995            .and(path("_matrix/client/r0/user_directory/search"))
2996            .and(body_json(&*test_json::search_users::SEARCH_USERS_REQUEST))
2997            .respond_with(
2998                ResponseTemplate::new(200)
2999                    .set_body_json(&*test_json::search_users::SEARCH_USERS_RESPONSE),
3000            )
3001            .mount(&server)
3002            .await;
3003
3004        let response = client.search_users("test", 50).await.unwrap();
3005        assert_eq!(response.results.len(), 1);
3006        let result = &response.results[0];
3007        assert_eq!(result.user_id.to_string(), "@test:example.me");
3008        assert_eq!(result.display_name.clone().unwrap(), "Test");
3009        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3010        assert!(!response.limited);
3011    }
3012
3013    #[async_test]
3014    async fn test_request_unstable_features() {
3015        let server = MockServer::start().await;
3016        let client = logged_in_client(Some(server.uri())).await;
3017
3018        Mock::given(method("GET"))
3019            .and(path("_matrix/client/versions"))
3020            .respond_with(
3021                ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
3022            )
3023            .mount(&server)
3024            .await;
3025
3026        let unstable_features = client.unstable_features().await.unwrap();
3027        assert_eq!(unstable_features.get("org.matrix.e2e_cross_signing"), Some(&true));
3028        assert_eq!(unstable_features.get("you.shall.pass"), None);
3029    }
3030
3031    #[async_test]
3032    async fn test_can_homeserver_push_encrypted_event_to_device() {
3033        let server = MockServer::start().await;
3034        let client = logged_in_client(Some(server.uri())).await;
3035
3036        Mock::given(method("GET"))
3037            .and(path("_matrix/client/versions"))
3038            .respond_with(
3039                ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
3040            )
3041            .mount(&server)
3042            .await;
3043
3044        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3045        assert!(msc4028_enabled);
3046    }
3047
3048    #[async_test]
3049    async fn test_recently_visited_rooms() {
3050        // Tracking recently visited rooms requires authentication
3051        let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
3052        assert_matches!(
3053            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3054            Err(Error::AuthenticationRequired)
3055        );
3056
3057        let client = logged_in_client(None).await;
3058        let account = client.account();
3059
3060        // We should start off with an empty list
3061        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3062
3063        // Tracking a valid room id should add it to the list
3064        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3065        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3066        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3067
3068        // And the existing list shouldn't be changed
3069        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3070        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3071
3072        // Tracking the same room again shouldn't change the list
3073        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3074        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3075        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3076
3077        // Tracking a second room should add it to the front of the list
3078        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3079        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3080        assert_eq!(
3081            account.get_recently_visited_rooms().await.unwrap(),
3082            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3083        );
3084
3085        // Tracking the first room yet again should move it to the front of the list
3086        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3087        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3088        assert_eq!(
3089            account.get_recently_visited_rooms().await.unwrap(),
3090            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3091        );
3092
3093        // Tracking should be capped at 20
3094        for n in 0..20 {
3095            account
3096                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3097                .await
3098                .unwrap();
3099        }
3100
3101        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3102
3103        // And the initial rooms should've been pushed out
3104        let rooms = account.get_recently_visited_rooms().await.unwrap();
3105        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3106        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3107
3108        // And the last tracked room should be the first
3109        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3110    }
3111
3112    #[async_test]
3113    async fn test_client_no_cycle_with_event_cache() {
3114        let client = logged_in_client(None).await;
3115
3116        // Wait for the init tasks to die.
3117        sleep(Duration::from_secs(1)).await;
3118
3119        let weak_client = WeakClient::from_client(&client);
3120        assert_eq!(weak_client.strong_count(), 1);
3121
3122        {
3123            let room_id = room_id!("!room:example.org");
3124
3125            // Have the client know the room.
3126            let response = SyncResponseBuilder::default()
3127                .add_joined_room(JoinedRoomBuilder::new(room_id))
3128                .build_sync_response();
3129            client.inner.base_client.receive_sync_response(response).await.unwrap();
3130
3131            client.event_cache().subscribe().unwrap();
3132
3133            let (_room_event_cache, _drop_handles) =
3134                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3135        }
3136
3137        drop(client);
3138
3139        // Give a bit of time for background tasks to die.
3140        sleep(Duration::from_secs(1)).await;
3141
3142        // The weak client must be the last reference to the client now.
3143        assert_eq!(weak_client.strong_count(), 0);
3144        let client = weak_client.get();
3145        assert!(
3146            client.is_none(),
3147            "too many strong references to the client: {}",
3148            Arc::strong_count(&client.unwrap().inner)
3149        );
3150    }
3151
3152    #[async_test]
3153    async fn test_server_info_caching() {
3154        let server = MockServer::start().await;
3155        let server_url = server.uri();
3156        let domain = server_url.strip_prefix("http://").unwrap();
3157        let server_name = <&ServerName>::try_from(domain).unwrap();
3158        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3159
3160        let well_known_mock = Mock::given(method("GET"))
3161            .and(path("/.well-known/matrix/client"))
3162            .respond_with(ResponseTemplate::new(200).set_body_raw(
3163                test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", server_url.as_ref()),
3164                "application/json",
3165            ))
3166            .named("well known mock")
3167            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3168            .mount_as_scoped(&server)
3169            .await;
3170
3171        let versions_mock = Mock::given(method("GET"))
3172            .and(path("/_matrix/client/versions"))
3173            .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3174            .named("first versions mock")
3175            .expect(1)
3176            .mount_as_scoped(&server)
3177            .await;
3178
3179        let memory_store = Arc::new(MemoryStore::new());
3180        let client = Client::builder()
3181            .insecure_server_name_no_tls(server_name)
3182            .store_config(
3183                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3184                    .state_store(memory_store.clone()),
3185            )
3186            .build()
3187            .await
3188            .unwrap();
3189
3190        assert_eq!(client.server_versions().await.unwrap().len(), 1);
3191
3192        // These subsequent calls hit the in-memory cache.
3193        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3194        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3195
3196        drop(client);
3197
3198        let client = Client::builder()
3199            .homeserver_url(server.uri()) // Configure this client directly so as to not hit the discovery endpoint.
3200            .store_config(
3201                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3202                    .state_store(memory_store.clone()),
3203            )
3204            .build()
3205            .await
3206            .unwrap();
3207
3208        // This call to the new client hits the on-disk cache.
3209        assert_eq!(
3210            client.unstable_features().await.unwrap().get("org.matrix.e2e_cross_signing"),
3211            Some(&true)
3212        );
3213
3214        // Then this call hits the in-memory cache.
3215        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3216
3217        drop(versions_mock);
3218        drop(well_known_mock);
3219        server.verify().await;
3220
3221        // Now, reset the cache, and observe the endpoints being called again once.
3222        client.reset_server_info().await.unwrap();
3223
3224        Mock::given(method("GET"))
3225            .and(path("/.well-known/matrix/client"))
3226            .respond_with(ResponseTemplate::new(200).set_body_raw(
3227                test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", server_url.as_ref()),
3228                "application/json",
3229            ))
3230            .named("second well known mock")
3231            .expect(1)
3232            .mount(&server)
3233            .await;
3234
3235        Mock::given(method("GET"))
3236            .and(path("/_matrix/client/versions"))
3237            .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3238            .expect(1)
3239            .named("second versions mock")
3240            .mount(&server)
3241            .await;
3242
3243        // Hits network again.
3244        assert_eq!(client.server_versions().await.unwrap().len(), 1);
3245        // Hits in-memory cache again.
3246        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3247        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3248    }
3249
3250    #[async_test]
3251    async fn test_server_info_without_a_well_known() {
3252        let server = MockServer::start().await;
3253        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3254
3255        let versions_mock = Mock::given(method("GET"))
3256            .and(path("/_matrix/client/versions"))
3257            .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3258            .named("first versions mock")
3259            .expect(1)
3260            .mount_as_scoped(&server)
3261            .await;
3262
3263        let memory_store = Arc::new(MemoryStore::new());
3264        let client = Client::builder()
3265            .homeserver_url(server.uri()) // Configure this client directly so as to not hit the discovery endpoint.
3266            .store_config(
3267                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3268                    .state_store(memory_store.clone()),
3269            )
3270            .build()
3271            .await
3272            .unwrap();
3273
3274        assert_eq!(client.server_versions().await.unwrap().len(), 1);
3275
3276        // These subsequent calls hit the in-memory cache.
3277        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3278        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3279
3280        drop(client);
3281
3282        let client = Client::builder()
3283            .homeserver_url(server.uri()) // Configure this client directly so as to not hit the discovery endpoint.
3284            .store_config(
3285                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3286                    .state_store(memory_store.clone()),
3287            )
3288            .build()
3289            .await
3290            .unwrap();
3291
3292        // This call to the new client hits the on-disk cache.
3293        assert_eq!(
3294            client.unstable_features().await.unwrap().get("org.matrix.e2e_cross_signing"),
3295            Some(&true)
3296        );
3297
3298        // Then this call hits the in-memory cache.
3299        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3300
3301        drop(versions_mock);
3302        server.verify().await;
3303
3304        // Now, reset the cache, and observe the endpoints being called again once.
3305        client.reset_server_info().await.unwrap();
3306
3307        Mock::given(method("GET"))
3308            .and(path("/_matrix/client/versions"))
3309            .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3310            .expect(1)
3311            .named("second versions mock")
3312            .mount(&server)
3313            .await;
3314
3315        // Hits network again.
3316        assert_eq!(client.server_versions().await.unwrap().len(), 1);
3317        // Hits in-memory cache again.
3318        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3319        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3320    }
3321
3322    #[async_test]
3323    async fn test_no_network_doesnt_cause_infinite_retries() {
3324        // Note: not `no_retry_test_client` or `logged_in_client` which uses the former,
3325        // since we want infinite retries for transient errors.
3326        let client =
3327            test_client_builder(None).request_config(RequestConfig::new()).build().await.unwrap();
3328        set_client_session(&client).await;
3329
3330        // We don't define a mock server on purpose here, so that the error is really a
3331        // network error.
3332        client.whoami().await.unwrap_err();
3333    }
3334
3335    #[async_test]
3336    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3337        let (client_builder, server) = test_client_builder_with_server().await;
3338        let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3339        set_client_session(&client).await;
3340
3341        let builder = Mock::given(method("GET"))
3342            .and(path("/_matrix/client/r0/sync"))
3343            .and(header("authorization", "Bearer 1234"))
3344            .and(query_param_is_missing("since"));
3345
3346        let room_id = room_id!("!room:example.org");
3347        let joined_room_builder = JoinedRoomBuilder::new(room_id);
3348        let mut sync_response_builder = SyncResponseBuilder::new();
3349        sync_response_builder.add_joined_room(joined_room_builder);
3350        let response_body = sync_response_builder.build_json_sync_response();
3351
3352        builder
3353            .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3354            .mount(&server)
3355            .await;
3356
3357        client.sync_once(SyncSettings::default()).await.unwrap();
3358
3359        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3360        assert_eq!(room.room_id(), room_id);
3361    }
3362
3363    #[async_test]
3364    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3365        let (client_builder, server) = test_client_builder_with_server().await;
3366        let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3367        set_client_session(&client).await;
3368
3369        let builder = Mock::given(method("GET"))
3370            .and(path("/_matrix/client/r0/sync"))
3371            .and(header("authorization", "Bearer 1234"))
3372            .and(query_param_is_missing("since"));
3373
3374        let room_id = room_id!("!room:example.org");
3375        let joined_room_builder = JoinedRoomBuilder::new(room_id);
3376        let mut sync_response_builder = SyncResponseBuilder::new();
3377        sync_response_builder.add_joined_room(joined_room_builder);
3378        let response_body = sync_response_builder.build_json_sync_response();
3379
3380        builder
3381            .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3382            .mount(&server)
3383            .await;
3384
3385        let client = Arc::new(client);
3386
3387        // Perform the /sync request with a delay so it starts after the
3388        // `await_room_remote_echo` call has happened
3389        spawn({
3390            let client = client.clone();
3391            async move {
3392                sleep(Duration::from_millis(100)).await;
3393                client.sync_once(SyncSettings::default()).await.unwrap();
3394            }
3395        });
3396
3397        let room =
3398            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3399        assert_eq!(room.room_id(), room_id);
3400    }
3401
3402    #[async_test]
3403    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3404        let (client_builder, _) = test_client_builder_with_server().await;
3405        let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3406        set_client_session(&client).await;
3407
3408        let room_id = room_id!("!room:example.org");
3409        // Room is not present so the client won't be able to find it. The call will
3410        // timeout.
3411        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3412    }
3413
3414    #[async_test]
3415    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3416        let (client_builder, server) = test_client_builder_with_server().await;
3417        let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3418        set_client_session(&client).await;
3419
3420        Mock::given(method("POST"))
3421            .and(path("_matrix/client/r0/createRoom"))
3422            .and(header("authorization", "Bearer 1234"))
3423            .respond_with(
3424                ResponseTemplate::new(200).set_body_json(json!({ "room_id": "!room:example.org"})),
3425            )
3426            .mount(&server)
3427            .await;
3428
3429        // Create a room in the internal store
3430        let room = client
3431            .create_room(assign!(CreateRoomRequest::new(), {
3432                invite: vec![],
3433                is_direct: false,
3434            }))
3435            .await
3436            .unwrap();
3437
3438        // Room is locally present, but not synced, the call will timeout
3439        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3440            .await
3441            .unwrap_err();
3442    }
3443
3444    #[async_test]
3445    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3446        let server = MatrixMockServer::new().await;
3447        let client = server.client_builder().build().await;
3448
3449        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3450
3451        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3452        assert_matches!(ret, Ok(true));
3453    }
3454
3455    #[async_test]
3456    async fn test_is_room_alias_available_if_alias_is_resolved() {
3457        let server = MatrixMockServer::new().await;
3458        let client = server.client_builder().build().await;
3459
3460        server
3461            .mock_room_directory_resolve_alias()
3462            .ok("!some_room_id:matrix.org", Vec::new())
3463            .expect(1)
3464            .mount()
3465            .await;
3466
3467        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3468        assert_matches!(ret, Ok(false));
3469    }
3470
3471    #[async_test]
3472    async fn test_is_room_alias_available_if_error_found() {
3473        let server = MatrixMockServer::new().await;
3474        let client = server.client_builder().build().await;
3475
3476        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3477
3478        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3479        assert_matches!(ret, Err(_));
3480    }
3481
3482    #[async_test]
3483    async fn test_create_room_alias() {
3484        let server = MatrixMockServer::new().await;
3485        let client = server.client_builder().build().await;
3486
3487        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3488
3489        let ret = client
3490            .create_room_alias(
3491                room_alias_id!("#some_alias:matrix.org"),
3492                room_id!("!some_room:matrix.org"),
3493            )
3494            .await;
3495        assert_matches!(ret, Ok(()));
3496    }
3497
3498    #[async_test]
3499    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3500        let server = MatrixMockServer::new().await;
3501        let client = server.client_builder().build().await;
3502
3503        let room_id = room_id!("!a-room:matrix.org");
3504
3505        // Make sure the summary endpoint is called once
3506        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3507
3508        // We create a locally cached invited room
3509        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3510
3511        // And we get a preview, the server endpoint was reached
3512        let preview = client
3513            .get_room_preview(room_id.into(), Vec::new())
3514            .await
3515            .expect("Room preview should be retrieved");
3516
3517        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3518    }
3519
3520    #[async_test]
3521    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3522        let server = MatrixMockServer::new().await;
3523        let client = server.client_builder().build().await;
3524
3525        let room_id = room_id!("!a-room:matrix.org");
3526
3527        // Make sure the summary endpoint is called once
3528        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3529
3530        // We create a locally cached left room
3531        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3532
3533        // And we get a preview, the server endpoint was reached
3534        let preview = client
3535            .get_room_preview(room_id.into(), Vec::new())
3536            .await
3537            .expect("Room preview should be retrieved");
3538
3539        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3540    }
3541
3542    #[async_test]
3543    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3544        let server = MatrixMockServer::new().await;
3545        let client = server.client_builder().build().await;
3546
3547        let room_id = room_id!("!a-room:matrix.org");
3548
3549        // Make sure the summary endpoint is called once
3550        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3551
3552        // We create a locally cached knocked room
3553        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3554
3555        // And we get a preview, the server endpoint was reached
3556        let preview = client
3557            .get_room_preview(room_id.into(), Vec::new())
3558            .await
3559            .expect("Room preview should be retrieved");
3560
3561        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3562    }
3563
3564    #[async_test]
3565    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3566        let server = MatrixMockServer::new().await;
3567        let client = server.client_builder().build().await;
3568
3569        let room_id = room_id!("!a-room:matrix.org");
3570
3571        // Make sure the summary endpoint is not called
3572        server.mock_room_summary().ok(room_id).never().mount().await;
3573
3574        // We create a locally cached joined room
3575        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3576
3577        // And we get a preview, no server endpoint was reached
3578        let preview = client
3579            .get_room_preview(room_id.into(), Vec::new())
3580            .await
3581            .expect("Room preview should be retrieved");
3582
3583        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3584    }
3585
3586    #[async_test]
3587    async fn test_media_preview_config() {
3588        let server = MatrixMockServer::new().await;
3589        let client = server.client_builder().build().await;
3590
3591        server
3592            .mock_sync()
3593            .ok_and_run(&client, |builder| {
3594                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3595                "content": {
3596                    "media_previews": "private",
3597                    "invite_avatars": "off"
3598                },
3599                "type": "m.media_preview_config"
3600                  })));
3601            })
3602            .await;
3603
3604        let (initial_value, stream) =
3605            client.account().observe_media_preview_config().await.unwrap();
3606
3607        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3608        assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3609        assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3610        pin_mut!(stream);
3611        assert_pending!(stream);
3612
3613        server
3614            .mock_sync()
3615            .ok_and_run(&client, |builder| {
3616                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3617                "content": {
3618                    "media_previews": "off",
3619                    "invite_avatars": "on"
3620                },
3621                "type": "m.media_preview_config"
3622                  })));
3623            })
3624            .await;
3625
3626        assert_next_matches!(
3627            stream,
3628            MediaPreviewConfigEventContent {
3629                media_previews: MediaPreviews::Off,
3630                invite_avatars: InviteAvatars::On,
3631                ..
3632            }
3633        );
3634        assert_pending!(stream);
3635    }
3636
3637    #[async_test]
3638    async fn test_unstable_media_preview_config() {
3639        let server = MatrixMockServer::new().await;
3640        let client = server.client_builder().build().await;
3641
3642        server
3643            .mock_sync()
3644            .ok_and_run(&client, |builder| {
3645                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3646                "content": {
3647                    "media_previews": "private",
3648                    "invite_avatars": "off"
3649                },
3650                "type": "io.element.msc4278.media_preview_config"
3651                  })));
3652            })
3653            .await;
3654
3655        let (initial_value, stream) =
3656            client.account().observe_media_preview_config().await.unwrap();
3657
3658        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3659        assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3660        assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3661        pin_mut!(stream);
3662        assert_pending!(stream);
3663
3664        server
3665            .mock_sync()
3666            .ok_and_run(&client, |builder| {
3667                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3668                "content": {
3669                    "media_previews": "off",
3670                    "invite_avatars": "on"
3671                },
3672                "type": "io.element.msc4278.media_preview_config"
3673                  })));
3674            })
3675            .await;
3676
3677        assert_next_matches!(
3678            stream,
3679            MediaPreviewConfigEventContent {
3680                media_previews: MediaPreviews::Off,
3681                invite_avatars: InviteAvatars::On,
3682                ..
3683            }
3684        );
3685        assert_pending!(stream);
3686    }
3687
3688    #[async_test]
3689    async fn test_media_preview_config_not_found() {
3690        let server = MatrixMockServer::new().await;
3691        let client = server.client_builder().build().await;
3692
3693        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3694
3695        assert!(initial_value.is_none());
3696    }
3697
3698    #[async_test]
3699    async fn test_load_or_fetch_max_upload_size() {
3700        let server = MatrixMockServer::new().await;
3701        let client = server.client_builder().build().await;
3702
3703        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3704
3705        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3706        client.load_or_fetch_max_upload_size().await.unwrap();
3707
3708        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3709    }
3710
3711    #[async_test]
3712    async fn test_uploading_a_too_large_media_file() {
3713        let server = MatrixMockServer::new().await;
3714        let client = server.client_builder().build().await;
3715
3716        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
3717        client.load_or_fetch_max_upload_size().await.unwrap();
3718        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
3719
3720        let data = vec![1, 2];
3721        let upload_request =
3722            ruma::api::client::media::create_content::v3::Request::new(data.clone());
3723        let request = SendRequest {
3724            client: client.clone(),
3725            request: upload_request,
3726            config: None,
3727            send_progress: SharedObservable::new(TransmissionProgress::default()),
3728        };
3729        let media_request = SendMediaUploadRequest::new(request);
3730
3731        let error = media_request.await.err();
3732        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
3733        assert_eq!(max, uint!(1));
3734        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
3735    }
3736}