matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
33use matrix_sdk_base::{
34    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
35    StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport,
36    event_cache::store::EventCacheStoreLock,
37    media::store::MediaStoreLock,
38    store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
39    sync::{Notification, RoomUpdates},
40};
41use matrix_sdk_common::ttl_cache::TtlCache;
42#[cfg(feature = "e2e-encryption")]
43use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
44use ruma::{
45    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
46    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
47    api::{
48        FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
49        client::{
50            account::whoami,
51            alias::{create_alias, delete_alias, get_alias},
52            authenticated_media,
53            device::{delete_devices, get_devices, update_device},
54            directory::{get_public_rooms, get_public_rooms_filtered},
55            discovery::{
56                discover_homeserver::{self, RtcFocusInfo},
57                get_capabilities::{self, v3::Capabilities},
58                get_supported_versions,
59            },
60            error::ErrorKind,
61            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
62            knock::knock_room,
63            media,
64            membership::{join_room_by_id, join_room_by_id_or_alias},
65            room::create_room,
66            session::login::v3::DiscoveryInfo,
67            sync::sync_events,
68            threads::get_thread_subscriptions_changes,
69            uiaa,
70            user_directory::search_users,
71        },
72        error::FromHttpResponseError,
73        federation::discovery::get_server_version,
74    },
75    assign,
76    push::Ruleset,
77    time::Instant,
78};
79use serde::de::DeserializeOwned;
80use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
81use tracing::{Instrument, Span, debug, error, instrument, trace, warn};
82use url::Url;
83
84use self::futures::SendRequest;
85use crate::{
86    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
87    Room, SessionTokens, TransmissionProgress,
88    authentication::{
89        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
90        oauth::OAuth,
91    },
92    client::thread_subscriptions::ThreadSubscriptionCatchup,
93    config::{RequestConfig, SyncToken},
94    deduplicating_handler::DeduplicatingHandler,
95    error::HttpResult,
96    event_cache::EventCache,
97    event_handler::{
98        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
99        EventHandlerStore, ObservableEventHandler, SyncEvent,
100    },
101    http_client::HttpClient,
102    latest_events::LatestEvents,
103    media::MediaError,
104    notification_settings::NotificationSettings,
105    room::RoomMember,
106    room_preview::RoomPreview,
107    send_queue::{SendQueue, SendQueueData},
108    sliding_sync::Version as SlidingSyncVersion,
109    sync::{RoomUpdate, SyncResponse},
110};
111#[cfg(feature = "e2e-encryption")]
112use crate::{
113    cross_process_lock::CrossProcessLock,
114    encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
115};
116
117mod builder;
118pub(crate) mod caches;
119pub(crate) mod futures;
120pub(crate) mod thread_subscriptions;
121
122pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
123#[cfg(feature = "experimental-search")]
124use crate::search_index::SearchIndex;
125
126#[cfg(not(target_family = "wasm"))]
127type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
128#[cfg(target_family = "wasm")]
129type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
130
131#[cfg(not(target_family = "wasm"))]
132type NotificationHandlerFn =
133    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
134#[cfg(target_family = "wasm")]
135type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
136
137/// Enum controlling if a loop running callbacks should continue or abort.
138///
139/// This is mainly used in the [`sync_with_callback`] method, the return value
140/// of the provided callback controls if the sync loop should be exited.
141///
142/// [`sync_with_callback`]: #method.sync_with_callback
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144pub enum LoopCtrl {
145    /// Continue running the loop.
146    Continue,
147    /// Break out of the loop.
148    Break,
149}
150
151/// Represents changes that can occur to a `Client`s `Session`.
152#[derive(Debug, Clone, PartialEq)]
153pub enum SessionChange {
154    /// The session's token is no longer valid.
155    UnknownToken {
156        /// Whether or not the session was soft logged out
157        soft_logout: bool,
158    },
159    /// The session's tokens have been refreshed.
160    TokensRefreshed,
161}
162
163/// Information about the server vendor obtained from the federation API.
164#[derive(Debug, Clone, PartialEq, Eq)]
165#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
166pub struct ServerVendorInfo {
167    /// The server name.
168    pub server_name: String,
169    /// The server version.
170    pub version: String,
171}
172
173/// An async/await enabled Matrix client.
174///
175/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
176#[derive(Clone)]
177pub struct Client {
178    pub(crate) inner: Arc<ClientInner>,
179}
180
181#[derive(Default)]
182pub(crate) struct ClientLocks {
183    /// Lock ensuring that only a single room may be marked as a DM at once.
184    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
185    /// explanation.
186    pub(crate) mark_as_dm_lock: Mutex<()>,
187
188    /// Lock ensuring that only a single secret store is getting opened at the
189    /// same time.
190    ///
191    /// This is important so we don't accidentally create multiple different new
192    /// default secret storage keys.
193    #[cfg(feature = "e2e-encryption")]
194    pub(crate) open_secret_store_lock: Mutex<()>,
195
196    /// Lock ensuring that we're only storing a single secret at a time.
197    ///
198    /// Take a look at the [`SecretStore::put_secret`] method for a more
199    /// detailed explanation.
200    ///
201    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
202    #[cfg(feature = "e2e-encryption")]
203    pub(crate) store_secret_lock: Mutex<()>,
204
205    /// Lock ensuring that only one method at a time might modify our backup.
206    #[cfg(feature = "e2e-encryption")]
207    pub(crate) backup_modify_lock: Mutex<()>,
208
209    /// Lock ensuring that we're going to attempt to upload backups for a single
210    /// requester.
211    #[cfg(feature = "e2e-encryption")]
212    pub(crate) backup_upload_lock: Mutex<()>,
213
214    /// Handler making sure we only have one group session sharing request in
215    /// flight per room.
216    #[cfg(feature = "e2e-encryption")]
217    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
218
219    /// Lock making sure we're only doing one key claim request at a time.
220    #[cfg(feature = "e2e-encryption")]
221    pub(crate) key_claim_lock: Mutex<()>,
222
223    /// Handler to ensure that only one members request is running at a time,
224    /// given a room.
225    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
226
227    /// Handler to ensure that only one encryption state request is running at a
228    /// time, given a room.
229    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
230
231    /// Deduplicating handler for sending read receipts. The string is an
232    /// internal implementation detail, see [`Self::send_single_receipt`].
233    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
234
235    #[cfg(feature = "e2e-encryption")]
236    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
237
238    /// Latest "generation" of data known by the crypto store.
239    ///
240    /// This is a counter that only increments, set in the database (and can
241    /// wrap). It's incremented whenever some process acquires a lock for the
242    /// first time. *This assumes the crypto store lock is being held, to
243    /// avoid data races on writing to this value in the store*.
244    ///
245    /// The current process will maintain this value in local memory and in the
246    /// DB over time. Observing a different value than the one read in
247    /// memory, when reading from the store indicates that somebody else has
248    /// written into the database under our feet.
249    ///
250    /// TODO: this should live in the `OlmMachine`, since it's information
251    /// related to the lock. As of today (2023-07-28), we blow up the entire
252    /// olm machine when there's a generation mismatch. So storing the
253    /// generation in the olm machine would make the client think there's
254    /// *always* a mismatch, and that's why we need to store the generation
255    /// outside the `OlmMachine`.
256    #[cfg(feature = "e2e-encryption")]
257    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
258}
259
260pub(crate) struct ClientInner {
261    /// All the data related to authentication and authorization.
262    pub(crate) auth_ctx: Arc<AuthCtx>,
263
264    /// The URL of the server.
265    ///
266    /// Not to be confused with the `Self::homeserver`. `server` is usually
267    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
268    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
269    /// homeserver (at the time of writing — 2024-08-28).
270    ///
271    /// This value is optional depending on how the `Client` has been built.
272    /// If it's been built from a homeserver URL directly, we don't know the
273    /// server. However, if the `Client` has been built from a server URL or
274    /// name, then the homeserver has been discovered, and we know both.
275    server: Option<Url>,
276
277    /// The URL of the homeserver to connect to.
278    ///
279    /// This is the URL for the client-server Matrix API.
280    homeserver: StdRwLock<Url>,
281
282    /// The sliding sync version.
283    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
284
285    /// The underlying HTTP client.
286    pub(crate) http_client: HttpClient,
287
288    /// User session data.
289    pub(super) base_client: BaseClient,
290
291    /// Collection of in-memory caches for the [`Client`].
292    pub(crate) caches: ClientCaches,
293
294    /// Collection of locks individual client methods might want to use, either
295    /// to ensure that only a single call to a method happens at once or to
296    /// deduplicate multiple calls to a method.
297    pub(crate) locks: ClientLocks,
298
299    /// The cross-process store locks holder name.
300    ///
301    /// The SDK provides cross-process store locks (see
302    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
303    /// `holder_name` is the value used for all cross-process store locks
304    /// used by this `Client`.
305    ///
306    /// If multiple `Client`s are running in different processes, this
307    /// value MUST be different for each `Client`.
308    cross_process_store_locks_holder_name: String,
309
310    /// A mapping of the times at which the current user sent typing notices,
311    /// keyed by room.
312    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
313
314    /// Event handlers. See `add_event_handler`.
315    pub(crate) event_handlers: EventHandlerStore,
316
317    /// Notification handlers. See `register_notification_handler`.
318    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
319
320    /// The sender-side of channels used to receive room updates.
321    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
322
323    /// The sender-side of a channel used to observe all the room updates of a
324    /// sync response.
325    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
326
327    /// Whether the client should update its homeserver URL with the discovery
328    /// information present in the login response.
329    respect_login_well_known: bool,
330
331    /// An event that can be listened on to wait for a successful sync. The
332    /// event will only be fired if a sync loop is running. Can be used for
333    /// synchronization, e.g. if we send out a request to create a room, we can
334    /// wait for the sync to get the data to fetch a room object from the state
335    /// store.
336    pub(crate) sync_beat: event_listener::Event,
337
338    /// A central cache for events, inactive first.
339    ///
340    /// It becomes active when [`EventCache::subscribe`] is called.
341    pub(crate) event_cache: OnceCell<EventCache>,
342
343    /// End-to-end encryption related state.
344    #[cfg(feature = "e2e-encryption")]
345    pub(crate) e2ee: EncryptionData,
346
347    /// The verification state of our own device.
348    #[cfg(feature = "e2e-encryption")]
349    pub(crate) verification_state: SharedObservable<VerificationState>,
350
351    /// Whether to enable the experimental support for sending and receiving
352    /// encrypted room history on invite, per [MSC4268].
353    ///
354    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
355    #[cfg(feature = "e2e-encryption")]
356    pub(crate) enable_share_history_on_invite: bool,
357
358    /// Data related to the [`SendQueue`].
359    ///
360    /// [`SendQueue`]: crate::send_queue::SendQueue
361    pub(crate) send_queue_data: Arc<SendQueueData>,
362
363    /// The `max_upload_size` value of the homeserver, it contains the max
364    /// request size you can send.
365    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
366
367    /// The entry point to get the [`LatestEvent`] of rooms and threads.
368    ///
369    /// [`LatestEvent`]: crate::latest_event::LatestEvent
370    latest_events: OnceCell<LatestEvents>,
371
372    /// Service handling the catching up of thread subscriptions in the
373    /// background.
374    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
375
376    #[cfg(feature = "experimental-search")]
377    /// Handler for [`RoomIndex`]'s of each room
378    search_index: SearchIndex,
379}
380
381impl ClientInner {
382    /// Create a new `ClientInner`.
383    ///
384    /// All the fields passed as parameters here are those that must be cloned
385    /// upon instantiation of a sub-client, e.g. a client specialized for
386    /// notifications.
387    #[allow(clippy::too_many_arguments)]
388    async fn new(
389        auth_ctx: Arc<AuthCtx>,
390        server: Option<Url>,
391        homeserver: Url,
392        sliding_sync_version: SlidingSyncVersion,
393        http_client: HttpClient,
394        base_client: BaseClient,
395        server_info: ClientServerInfo,
396        respect_login_well_known: bool,
397        event_cache: OnceCell<EventCache>,
398        send_queue: Arc<SendQueueData>,
399        latest_events: OnceCell<LatestEvents>,
400        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
401        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
402        cross_process_store_locks_holder_name: String,
403        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
404        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
405    ) -> Arc<Self> {
406        let caches = ClientCaches {
407            server_info: server_info.into(),
408            server_metadata: Mutex::new(TtlCache::new()),
409        };
410
411        let client = Self {
412            server,
413            homeserver: StdRwLock::new(homeserver),
414            auth_ctx,
415            sliding_sync_version: StdRwLock::new(sliding_sync_version),
416            http_client,
417            base_client,
418            caches,
419            locks: Default::default(),
420            cross_process_store_locks_holder_name,
421            typing_notice_times: Default::default(),
422            event_handlers: Default::default(),
423            notification_handlers: Default::default(),
424            room_update_channels: Default::default(),
425            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
426            // ballast for all observers to catch up.
427            room_updates_sender: broadcast::Sender::new(32),
428            respect_login_well_known,
429            sync_beat: event_listener::Event::new(),
430            event_cache,
431            send_queue_data: send_queue,
432            latest_events,
433            #[cfg(feature = "e2e-encryption")]
434            e2ee: EncryptionData::new(encryption_settings),
435            #[cfg(feature = "e2e-encryption")]
436            verification_state: SharedObservable::new(VerificationState::Unknown),
437            #[cfg(feature = "e2e-encryption")]
438            enable_share_history_on_invite,
439            server_max_upload_size: Mutex::new(OnceCell::new()),
440            #[cfg(feature = "experimental-search")]
441            search_index: search_index_handler,
442            thread_subscription_catchup,
443        };
444
445        #[allow(clippy::let_and_return)]
446        let client = Arc::new(client);
447
448        #[cfg(feature = "e2e-encryption")]
449        client.e2ee.initialize_tasks(&client);
450
451        let _ = client
452            .event_cache
453            .get_or_init(|| async {
454                EventCache::new(
455                    WeakClient::from_inner(&client),
456                    client.base_client.event_cache_store().clone(),
457                )
458            })
459            .await;
460
461        let _ = client
462            .thread_subscription_catchup
463            .get_or_init(|| async {
464                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
465            })
466            .await;
467
468        client
469    }
470}
471
472#[cfg(not(tarpaulin_include))]
473impl Debug for Client {
474    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
475        write!(fmt, "Client")
476    }
477}
478
479impl Client {
480    /// Create a new [`Client`] that will use the given homeserver.
481    ///
482    /// # Arguments
483    ///
484    /// * `homeserver_url` - The homeserver that the client should connect to.
485    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
486        Self::builder().homeserver_url(homeserver_url).build().await
487    }
488
489    /// Returns a subscriber that publishes an event every time the ignore user
490    /// list changes.
491    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
492        self.inner.base_client.subscribe_to_ignore_user_list_changes()
493    }
494
495    /// Create a new [`ClientBuilder`].
496    pub fn builder() -> ClientBuilder {
497        ClientBuilder::new()
498    }
499
500    pub(crate) fn base_client(&self) -> &BaseClient {
501        &self.inner.base_client
502    }
503
504    /// The underlying HTTP client.
505    pub fn http_client(&self) -> &reqwest::Client {
506        &self.inner.http_client.inner
507    }
508
509    pub(crate) fn locks(&self) -> &ClientLocks {
510        &self.inner.locks
511    }
512
513    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
514        &self.inner.auth_ctx
515    }
516
517    /// The cross-process store locks holder name.
518    ///
519    /// The SDK provides cross-process store locks (see
520    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
521    /// `holder_name` is the value used for all cross-process store locks
522    /// used by this `Client`.
523    pub fn cross_process_store_locks_holder_name(&self) -> &str {
524        &self.inner.cross_process_store_locks_holder_name
525    }
526
527    /// Change the homeserver URL used by this client.
528    ///
529    /// # Arguments
530    ///
531    /// * `homeserver_url` - The new URL to use.
532    fn set_homeserver(&self, homeserver_url: Url) {
533        *self.inner.homeserver.write().unwrap() = homeserver_url;
534    }
535
536    /// Get the capabilities of the homeserver.
537    ///
538    /// This method should be used to check what features are supported by the
539    /// homeserver.
540    ///
541    /// # Examples
542    ///
543    /// ```no_run
544    /// # use matrix_sdk::Client;
545    /// # use url::Url;
546    /// # async {
547    /// # let homeserver = Url::parse("http://example.com")?;
548    /// let client = Client::new(homeserver).await?;
549    ///
550    /// let capabilities = client.get_capabilities().await?;
551    ///
552    /// if capabilities.change_password.enabled {
553    ///     // Change password
554    /// }
555    /// # anyhow::Ok(()) };
556    /// ```
557    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
558        let res = self.send(get_capabilities::v3::Request::new()).await?;
559        Ok(res.capabilities)
560    }
561
562    /// Get the server vendor information from the federation API.
563    ///
564    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
565    /// both the server's software name and version.
566    ///
567    /// # Examples
568    ///
569    /// ```no_run
570    /// # use matrix_sdk::Client;
571    /// # use url::Url;
572    /// # async {
573    /// # let homeserver = Url::parse("http://example.com")?;
574    /// let client = Client::new(homeserver).await?;
575    ///
576    /// let server_info = client.server_vendor_info(None).await?;
577    /// println!(
578    ///     "Server: {}, Version: {}",
579    ///     server_info.server_name, server_info.version
580    /// );
581    /// # anyhow::Ok(()) };
582    /// ```
583    pub async fn server_vendor_info(
584        &self,
585        request_config: Option<RequestConfig>,
586    ) -> HttpResult<ServerVendorInfo> {
587        let res = self
588            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
589            .await?;
590
591        // Extract server info, using defaults if fields are missing.
592        let server = res.server.unwrap_or_default();
593        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
594        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
595
596        Ok(ServerVendorInfo { server_name: server_name_str, version })
597    }
598
599    /// Get a copy of the default request config.
600    ///
601    /// The default request config is what's used when sending requests if no
602    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
603    /// function with such a parameter.
604    ///
605    /// If the default request config was not customized through
606    /// [`ClientBuilder`] when creating this `Client`, the returned value will
607    /// be equivalent to [`RequestConfig::default()`].
608    pub fn request_config(&self) -> RequestConfig {
609        self.inner.http_client.request_config
610    }
611
612    /// Check whether the client has been activated.
613    ///
614    /// A client is considered active when:
615    ///
616    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
617    ///    is logged in,
618    /// 2. Has loaded cached data from storage,
619    /// 3. If encryption is enabled, it also initialized or restored its
620    ///    `OlmMachine`.
621    pub fn is_active(&self) -> bool {
622        self.inner.base_client.is_active()
623    }
624
625    /// The server used by the client.
626    ///
627    /// See `Self::server` to learn more.
628    pub fn server(&self) -> Option<&Url> {
629        self.inner.server.as_ref()
630    }
631
632    /// The homeserver of the client.
633    pub fn homeserver(&self) -> Url {
634        self.inner.homeserver.read().unwrap().clone()
635    }
636
637    /// Get the sliding sync version.
638    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
639        self.inner.sliding_sync_version.read().unwrap().clone()
640    }
641
642    /// Override the sliding sync version.
643    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
644        let mut lock = self.inner.sliding_sync_version.write().unwrap();
645        *lock = version;
646    }
647
648    /// Get the Matrix user session meta information.
649    ///
650    /// If the client is currently logged in, this will return a
651    /// [`SessionMeta`] object which contains the user ID and device ID.
652    /// Otherwise it returns `None`.
653    pub fn session_meta(&self) -> Option<&SessionMeta> {
654        self.base_client().session_meta()
655    }
656
657    /// Returns a receiver that gets events for each room info update. To watch
658    /// for new events, use `receiver.resubscribe()`.
659    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
660        self.base_client().room_info_notable_update_receiver()
661    }
662
663    /// Performs a search for users.
664    /// The search is performed case-insensitively on user IDs and display names
665    ///
666    /// # Arguments
667    ///
668    /// * `search_term` - The search term for the search
669    /// * `limit` - The maximum number of results to return. Defaults to 10.
670    ///
671    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
672    pub async fn search_users(
673        &self,
674        search_term: &str,
675        limit: u64,
676    ) -> HttpResult<search_users::v3::Response> {
677        let mut request = search_users::v3::Request::new(search_term.to_owned());
678
679        if let Some(limit) = UInt::new(limit) {
680            request.limit = limit;
681        }
682
683        self.send(request).await
684    }
685
686    /// Get the user id of the current owner of the client.
687    pub fn user_id(&self) -> Option<&UserId> {
688        self.session_meta().map(|s| s.user_id.as_ref())
689    }
690
691    /// Get the device ID that identifies the current session.
692    pub fn device_id(&self) -> Option<&DeviceId> {
693        self.session_meta().map(|s| s.device_id.as_ref())
694    }
695
696    /// Get the current access token for this session.
697    ///
698    /// Will be `None` if the client has not been logged in.
699    pub fn access_token(&self) -> Option<String> {
700        self.auth_ctx().access_token()
701    }
702
703    /// Get the current tokens for this session.
704    ///
705    /// To be notified of changes in the session tokens, use
706    /// [`Client::subscribe_to_session_changes()`] or
707    /// [`Client::set_session_callbacks()`].
708    ///
709    /// Returns `None` if the client has not been logged in.
710    pub fn session_tokens(&self) -> Option<SessionTokens> {
711        self.auth_ctx().session_tokens()
712    }
713
714    /// Access the authentication API used to log in this client.
715    ///
716    /// Will be `None` if the client has not been logged in.
717    pub fn auth_api(&self) -> Option<AuthApi> {
718        match self.auth_ctx().auth_data.get()? {
719            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
720            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
721        }
722    }
723
724    /// Get the whole session info of this client.
725    ///
726    /// Will be `None` if the client has not been logged in.
727    ///
728    /// Can be used with [`Client::restore_session`] to restore a previously
729    /// logged-in session.
730    pub fn session(&self) -> Option<AuthSession> {
731        match self.auth_api()? {
732            AuthApi::Matrix(api) => api.session().map(Into::into),
733            AuthApi::OAuth(api) => api.full_session().map(Into::into),
734        }
735    }
736
737    /// Get a reference to the state store.
738    pub fn state_store(&self) -> &DynStateStore {
739        self.base_client().state_store()
740    }
741
742    /// Get a reference to the event cache store.
743    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
744        self.base_client().event_cache_store()
745    }
746
747    /// Get a reference to the media store.
748    pub fn media_store(&self) -> &MediaStoreLock {
749        self.base_client().media_store()
750    }
751
752    /// Access the native Matrix authentication API with this client.
753    pub fn matrix_auth(&self) -> MatrixAuth {
754        MatrixAuth::new(self.clone())
755    }
756
757    /// Get the account of the current owner of the client.
758    pub fn account(&self) -> Account {
759        Account::new(self.clone())
760    }
761
762    /// Get the encryption manager of the client.
763    #[cfg(feature = "e2e-encryption")]
764    pub fn encryption(&self) -> Encryption {
765        Encryption::new(self.clone())
766    }
767
768    /// Get the media manager of the client.
769    pub fn media(&self) -> Media {
770        Media::new(self.clone())
771    }
772
773    /// Get the pusher manager of the client.
774    pub fn pusher(&self) -> Pusher {
775        Pusher::new(self.clone())
776    }
777
778    /// Access the OAuth 2.0 API of the client.
779    pub fn oauth(&self) -> OAuth {
780        OAuth::new(self.clone())
781    }
782
783    /// Register a handler for a specific event type.
784    ///
785    /// The handler is a function or closure with one or more arguments. The
786    /// first argument is the event itself. All additional arguments are
787    /// "context" arguments: They have to implement [`EventHandlerContext`].
788    /// This trait is named that way because most of the types implementing it
789    /// give additional context about an event: The room it was in, its raw form
790    /// and other similar things. As two exceptions to this,
791    /// [`Client`] and [`EventHandlerHandle`] also implement the
792    /// `EventHandlerContext` trait so you don't have to clone your client
793    /// into the event handler manually and a handler can decide to remove
794    /// itself.
795    ///
796    /// Some context arguments are not universally applicable. A context
797    /// argument that isn't available for the given event type will result in
798    /// the event handler being skipped and an error being logged. The following
799    /// context argument types are only available for a subset of event types:
800    ///
801    /// * [`Room`] is only available for room-specific events, i.e. not for
802    ///   events like global account data events or presence events.
803    ///
804    /// You can provide custom context via
805    /// [`add_event_handler_context`](Client::add_event_handler_context) and
806    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
807    /// into the event handler.
808    ///
809    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
810    ///
811    /// # Examples
812    ///
813    /// ```no_run
814    /// use matrix_sdk::{
815    ///     deserialized_responses::EncryptionInfo,
816    ///     event_handler::Ctx,
817    ///     ruma::{
818    ///         events::{
819    ///             macros::EventContent,
820    ///             push_rules::PushRulesEvent,
821    ///             room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
822    ///         },
823    ///         push::Action,
824    ///         Int, MilliSecondsSinceUnixEpoch,
825    ///     },
826    ///     Client, Room,
827    /// };
828    /// use serde::{Deserialize, Serialize};
829    ///
830    /// # async fn example(client: Client) {
831    /// client.add_event_handler(
832    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
833    ///         // Common usage: Room event plus room and client.
834    ///     },
835    /// );
836    /// client.add_event_handler(
837    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
838    ///         async move {
839    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
840    ///             // unencrypted events and events that were decrypted by the SDK.
841    ///         }
842    ///     },
843    /// );
844    /// client.add_event_handler(
845    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
846    ///         async move {
847    ///             // A `Vec<Action>` parameter allows you to know which push actions
848    ///             // are applicable for an event. For example, an event with
849    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
850    ///             // in the timeline.
851    ///         }
852    ///     },
853    /// );
854    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
855    ///     // You can omit any or all arguments after the first.
856    /// });
857    ///
858    /// // Registering a temporary event handler:
859    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
860    ///     /* Event handler */
861    /// });
862    /// client.remove_event_handler(handle);
863    ///
864    /// // Registering custom event handler context:
865    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
866    /// struct MyContext {
867    ///     number: usize,
868    /// }
869    /// client.add_event_handler_context(MyContext { number: 5 });
870    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
871    ///     // Use the context
872    /// });
873    ///
874    /// // Custom events work exactly the same way, you just need to declare
875    /// // the content struct and use the EventContent derive macro on it.
876    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
877    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
878    /// struct TokenEventContent {
879    ///     token: String,
880    ///     #[serde(rename = "exp")]
881    ///     expires_at: MilliSecondsSinceUnixEpoch,
882    /// }
883    ///
884    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
885    ///     todo!("Display the token");
886    /// });
887    ///
888    /// // Event handler closures can also capture local variables.
889    /// // Make sure they are cheap to clone though, because they will be cloned
890    /// // every time the closure is called.
891    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
892    ///
893    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
894    ///     println!("Calling the handler with identifier {data}");
895    /// });
896    /// # }
897    /// ```
898    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
899    where
900        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
901        H: EventHandler<Ev, Ctx>,
902    {
903        self.add_event_handler_impl(handler, None)
904    }
905
906    /// Register a handler for a specific room, and event type.
907    ///
908    /// This method works the same way as
909    /// [`add_event_handler`][Self::add_event_handler], except that the handler
910    /// will only be called for events in the room with the specified ID. See
911    /// that method for more details on event handler functions.
912    ///
913    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
914    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
915    /// your use case.
916    pub fn add_room_event_handler<Ev, Ctx, H>(
917        &self,
918        room_id: &RoomId,
919        handler: H,
920    ) -> EventHandlerHandle
921    where
922        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
923        H: EventHandler<Ev, Ctx>,
924    {
925        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
926    }
927
928    /// Observe a specific event type.
929    ///
930    /// `Ev` represents the kind of event that will be observed. `Ctx`
931    /// represents the context that will come with the event. It relies on the
932    /// same mechanism as [`Client::add_event_handler`]. The main difference is
933    /// that it returns an [`ObservableEventHandler`] and doesn't require a
934    /// user-defined closure. It is possible to subscribe to the
935    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
936    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
937    /// Ctx)`.
938    ///
939    /// Be careful that only the most recent value can be observed. Subscribers
940    /// are notified when a new value is sent, but there is no guarantee
941    /// that they will see all values.
942    ///
943    /// # Example
944    ///
945    /// Let's see a classical usage:
946    ///
947    /// ```
948    /// use futures_util::StreamExt as _;
949    /// use matrix_sdk::{
950    ///     Client, Room,
951    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
952    /// };
953    ///
954    /// # async fn example(client: Client) -> Option<()> {
955    /// let observer =
956    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
957    ///
958    /// let mut subscriber = observer.subscribe();
959    ///
960    /// let (event, (room, push_actions)) = subscriber.next().await?;
961    /// # Some(())
962    /// # }
963    /// ```
964    ///
965    /// Now let's see how to get several contexts that can be useful for you:
966    ///
967    /// ```
968    /// use matrix_sdk::{
969    ///     Client, Room,
970    ///     deserialized_responses::EncryptionInfo,
971    ///     ruma::{
972    ///         events::room::{
973    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
974    ///         },
975    ///         push::Action,
976    ///     },
977    /// };
978    ///
979    /// # async fn example(client: Client) {
980    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
981    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
982    ///
983    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
984    /// // to distinguish between unencrypted events and events that were decrypted
985    /// // by the SDK.
986    /// let _ = client
987    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
988    ///     );
989    ///
990    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
991    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
992    /// // should be highlighted in the timeline.
993    /// let _ =
994    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
995    ///
996    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
997    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
998    /// # }
999    /// ```
1000    ///
1001    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1002    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1003    where
1004        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1005        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1006    {
1007        self.observe_room_events_impl(None)
1008    }
1009
1010    /// Observe a specific room, and event type.
1011    ///
1012    /// This method works the same way as [`Client::observe_events`], except
1013    /// that the observability will only be applied for events in the room with
1014    /// the specified ID. See that method for more details.
1015    ///
1016    /// Be careful that only the most recent value can be observed. Subscribers
1017    /// are notified when a new value is sent, but there is no guarantee
1018    /// that they will see all values.
1019    pub fn observe_room_events<Ev, Ctx>(
1020        &self,
1021        room_id: &RoomId,
1022    ) -> ObservableEventHandler<(Ev, Ctx)>
1023    where
1024        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1025        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1026    {
1027        self.observe_room_events_impl(Some(room_id.to_owned()))
1028    }
1029
1030    /// Shared implementation for `Client::observe_events` and
1031    /// `Client::observe_room_events`.
1032    fn observe_room_events_impl<Ev, Ctx>(
1033        &self,
1034        room_id: Option<OwnedRoomId>,
1035    ) -> ObservableEventHandler<(Ev, Ctx)>
1036    where
1037        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1038        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1039    {
1040        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1041        // new value.
1042        let shared_observable = SharedObservable::new(None);
1043
1044        ObservableEventHandler::new(
1045            shared_observable.clone(),
1046            self.event_handler_drop_guard(self.add_event_handler_impl(
1047                move |event: Ev, context: Ctx| {
1048                    shared_observable.set(Some((event, context)));
1049
1050                    ready(())
1051                },
1052                room_id,
1053            )),
1054        )
1055    }
1056
1057    /// Remove the event handler associated with the handle.
1058    ///
1059    /// Note that you **must not** call `remove_event_handler` from the
1060    /// non-async part of an event handler, that is:
1061    ///
1062    /// ```ignore
1063    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1064    ///     // ⚠ this will cause a deadlock ⚠
1065    ///     client.remove_event_handler(handle);
1066    ///
1067    ///     async move {
1068    ///         // removing the event handler here is fine
1069    ///         client.remove_event_handler(handle);
1070    ///     }
1071    /// })
1072    /// ```
1073    ///
1074    /// Note also that handlers that remove themselves will still execute with
1075    /// events received in the same sync cycle.
1076    ///
1077    /// # Arguments
1078    ///
1079    /// `handle` - The [`EventHandlerHandle`] that is returned when
1080    /// registering the event handler with [`Client::add_event_handler`].
1081    ///
1082    /// # Examples
1083    ///
1084    /// ```no_run
1085    /// # use url::Url;
1086    /// # use tokio::sync::mpsc;
1087    /// #
1088    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1089    /// #
1090    /// use matrix_sdk::{
1091    ///     Client, event_handler::EventHandlerHandle,
1092    ///     ruma::events::room::member::SyncRoomMemberEvent,
1093    /// };
1094    /// #
1095    /// # futures_executor::block_on(async {
1096    /// # let client = matrix_sdk::Client::builder()
1097    /// #     .homeserver_url(homeserver)
1098    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1099    /// #     .build()
1100    /// #     .await
1101    /// #     .unwrap();
1102    ///
1103    /// client.add_event_handler(
1104    ///     |ev: SyncRoomMemberEvent,
1105    ///      client: Client,
1106    ///      handle: EventHandlerHandle| async move {
1107    ///         // Common usage: Check arriving Event is the expected one
1108    ///         println!("Expected RoomMemberEvent received!");
1109    ///         client.remove_event_handler(handle);
1110    ///     },
1111    /// );
1112    /// # });
1113    /// ```
1114    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1115        self.inner.event_handlers.remove(handle);
1116    }
1117
1118    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1119    /// the given handle.
1120    ///
1121    /// When the returned value is dropped, the event handler will be removed.
1122    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1123        EventHandlerDropGuard::new(handle, self.clone())
1124    }
1125
1126    /// Add an arbitrary value for use as event handler context.
1127    ///
1128    /// The value can be obtained in an event handler by adding an argument of
1129    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1130    ///
1131    /// If a value of the same type has been added before, it will be
1132    /// overwritten.
1133    ///
1134    /// # Examples
1135    ///
1136    /// ```no_run
1137    /// use matrix_sdk::{
1138    ///     Room, event_handler::Ctx,
1139    ///     ruma::events::room::message::SyncRoomMessageEvent,
1140    /// };
1141    /// # #[derive(Clone)]
1142    /// # struct SomeType;
1143    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1144    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1145    /// # futures_executor::block_on(async {
1146    /// # let client = matrix_sdk::Client::builder()
1147    /// #     .homeserver_url(homeserver)
1148    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1149    /// #     .build()
1150    /// #     .await
1151    /// #     .unwrap();
1152    ///
1153    /// // Handle used to send messages to the UI part of the app
1154    /// let my_gui_handle: SomeType = obtain_gui_handle();
1155    ///
1156    /// client.add_event_handler_context(my_gui_handle.clone());
1157    /// client.add_event_handler(
1158    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1159    ///         async move {
1160    ///             // gui_handle.send(DisplayMessage { message: ev });
1161    ///         }
1162    ///     },
1163    /// );
1164    /// # });
1165    /// ```
1166    pub fn add_event_handler_context<T>(&self, ctx: T)
1167    where
1168        T: Clone + Send + Sync + 'static,
1169    {
1170        self.inner.event_handlers.add_context(ctx);
1171    }
1172
1173    /// Register a handler for a notification.
1174    ///
1175    /// Similar to [`Client::add_event_handler`], but only allows functions
1176    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1177    /// [`Client`] for now.
1178    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1179    where
1180        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1181        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1182    {
1183        self.inner.notification_handlers.write().await.push(Box::new(
1184            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1185        ));
1186
1187        self
1188    }
1189
1190    /// Subscribe to all updates for the room with the given ID.
1191    ///
1192    /// The returned receiver will receive a new message for each sync response
1193    /// that contains updates for that room.
1194    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1195        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1196            btree_map::Entry::Vacant(entry) => {
1197                let (tx, rx) = broadcast::channel(8);
1198                entry.insert(tx);
1199                rx
1200            }
1201            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1202        }
1203    }
1204
1205    /// Subscribe to all updates to all rooms, whenever any has been received in
1206    /// a sync response.
1207    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1208        self.inner.room_updates_sender.subscribe()
1209    }
1210
1211    pub(crate) async fn notification_handlers(
1212        &self,
1213    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1214        self.inner.notification_handlers.read().await
1215    }
1216
1217    /// Get all the rooms the client knows about.
1218    ///
1219    /// This will return the list of joined, invited, and left rooms.
1220    pub fn rooms(&self) -> Vec<Room> {
1221        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1222    }
1223
1224    /// Get all the rooms the client knows about, filtered by room state.
1225    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1226        self.base_client()
1227            .rooms_filtered(filter)
1228            .into_iter()
1229            .map(|room| Room::new(self.clone(), room))
1230            .collect()
1231    }
1232
1233    /// Get a stream of all the rooms, in addition to the existing rooms.
1234    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1235        let (rooms, stream) = self.base_client().rooms_stream();
1236
1237        let map_room = |room| Room::new(self.clone(), room);
1238
1239        (
1240            rooms.into_iter().map(map_room).collect(),
1241            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1242        )
1243    }
1244
1245    /// Returns the joined rooms this client knows about.
1246    pub fn joined_rooms(&self) -> Vec<Room> {
1247        self.rooms_filtered(RoomStateFilter::JOINED)
1248    }
1249
1250    /// Returns the invited rooms this client knows about.
1251    pub fn invited_rooms(&self) -> Vec<Room> {
1252        self.rooms_filtered(RoomStateFilter::INVITED)
1253    }
1254
1255    /// Returns the left rooms this client knows about.
1256    pub fn left_rooms(&self) -> Vec<Room> {
1257        self.rooms_filtered(RoomStateFilter::LEFT)
1258    }
1259
1260    /// Returns the joined space rooms this client knows about.
1261    pub fn joined_space_rooms(&self) -> Vec<Room> {
1262        self.base_client()
1263            .rooms_filtered(RoomStateFilter::JOINED)
1264            .into_iter()
1265            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1266            .collect()
1267    }
1268
1269    /// Get a room with the given room id.
1270    ///
1271    /// # Arguments
1272    ///
1273    /// `room_id` - The unique id of the room that should be fetched.
1274    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1275        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1276    }
1277
1278    /// Gets the preview of a room, whether the current user has joined it or
1279    /// not.
1280    pub async fn get_room_preview(
1281        &self,
1282        room_or_alias_id: &RoomOrAliasId,
1283        via: Vec<OwnedServerName>,
1284    ) -> Result<RoomPreview> {
1285        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1286            Ok(room_id) => room_id.to_owned(),
1287            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1288        };
1289
1290        if let Some(room) = self.get_room(&room_id) {
1291            // The cached data can only be trusted if the room state is joined or
1292            // banned: for invite and knock rooms, no updates will be received
1293            // for the rooms after the invite/knock action took place so we may
1294            // have very out to date data for important fields such as
1295            // `join_rule`. For left rooms, the homeserver should return the latest info.
1296            match room.state() {
1297                RoomState::Joined | RoomState::Banned => {
1298                    return Ok(RoomPreview::from_known_room(&room).await);
1299                }
1300                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1301            }
1302        }
1303
1304        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1305    }
1306
1307    /// Resolve a room alias to a room id and a list of servers which know
1308    /// about it.
1309    ///
1310    /// # Arguments
1311    ///
1312    /// `room_alias` - The room alias to be resolved.
1313    pub async fn resolve_room_alias(
1314        &self,
1315        room_alias: &RoomAliasId,
1316    ) -> HttpResult<get_alias::v3::Response> {
1317        let request = get_alias::v3::Request::new(room_alias.to_owned());
1318        self.send(request).await
1319    }
1320
1321    /// Checks if a room alias is not in use yet.
1322    ///
1323    /// Returns:
1324    /// - `Ok(true)` if the room alias is available.
1325    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1326    ///   status code).
1327    /// - An `Err` otherwise.
1328    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1329        match self.resolve_room_alias(alias).await {
1330            // The room alias was resolved, so it's already in use.
1331            Ok(_) => Ok(false),
1332            Err(error) => {
1333                match error.client_api_error_kind() {
1334                    // The room alias wasn't found, so it's available.
1335                    Some(ErrorKind::NotFound) => Ok(true),
1336                    _ => Err(error),
1337                }
1338            }
1339        }
1340    }
1341
1342    /// Adds a new room alias associated with a room to the room directory.
1343    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1344        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1345        self.send(request).await?;
1346        Ok(())
1347    }
1348
1349    /// Removes a room alias from the room directory.
1350    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1351        let request = delete_alias::v3::Request::new(alias.to_owned());
1352        self.send(request).await?;
1353        Ok(())
1354    }
1355
1356    /// Update the homeserver from the login response well-known if needed.
1357    ///
1358    /// # Arguments
1359    ///
1360    /// * `login_well_known` - The `well_known` field from a successful login
1361    ///   response.
1362    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1363        if self.inner.respect_login_well_known
1364            && let Some(well_known) = login_well_known
1365            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1366        {
1367            self.set_homeserver(homeserver);
1368        }
1369    }
1370
1371    /// Similar to [`Client::restore_session_with`], with
1372    /// [`RoomLoadSettings::default()`].
1373    ///
1374    /// # Panics
1375    ///
1376    /// Panics if a session was already restored or logged in.
1377    #[instrument(skip_all)]
1378    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1379        self.restore_session_with(session, RoomLoadSettings::default()).await
1380    }
1381
1382    /// Restore a session previously logged-in using one of the available
1383    /// authentication APIs. The number of rooms to restore is controlled by
1384    /// [`RoomLoadSettings`].
1385    ///
1386    /// See the documentation of the corresponding authentication API's
1387    /// `restore_session` method for more information.
1388    ///
1389    /// # Panics
1390    ///
1391    /// Panics if a session was already restored or logged in.
1392    #[instrument(skip_all)]
1393    pub async fn restore_session_with(
1394        &self,
1395        session: impl Into<AuthSession>,
1396        room_load_settings: RoomLoadSettings,
1397    ) -> Result<()> {
1398        let session = session.into();
1399        match session {
1400            AuthSession::Matrix(session) => {
1401                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1402            }
1403            AuthSession::OAuth(session) => {
1404                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1405            }
1406        }
1407    }
1408
1409    /// Refresh the access token using the authentication API used to log into
1410    /// this session.
1411    ///
1412    /// See the documentation of the authentication API's `refresh_access_token`
1413    /// method for more information.
1414    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1415        let Some(auth_api) = self.auth_api() else {
1416            return Err(RefreshTokenError::RefreshTokenRequired);
1417        };
1418
1419        match auth_api {
1420            AuthApi::Matrix(api) => {
1421                trace!("Token refresh: Using the homeserver.");
1422                Box::pin(api.refresh_access_token()).await?;
1423            }
1424            AuthApi::OAuth(api) => {
1425                trace!("Token refresh: Using OAuth 2.0.");
1426                Box::pin(api.refresh_access_token()).await?;
1427            }
1428        }
1429
1430        Ok(())
1431    }
1432
1433    /// Log out the current session using the proper authentication API.
1434    ///
1435    /// # Errors
1436    ///
1437    /// Returns an error if the session is not authenticated or if an error
1438    /// occurred while making the request to the server.
1439    pub async fn logout(&self) -> Result<(), Error> {
1440        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1441        match auth_api {
1442            AuthApi::Matrix(matrix_auth) => {
1443                matrix_auth.logout().await?;
1444                Ok(())
1445            }
1446            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1447        }
1448    }
1449
1450    /// Get or upload a sync filter.
1451    ///
1452    /// This method will either get a filter ID from the store or upload the
1453    /// filter definition to the homeserver and return the new filter ID.
1454    ///
1455    /// # Arguments
1456    ///
1457    /// * `filter_name` - The unique name of the filter, this name will be used
1458    /// locally to store and identify the filter ID returned by the server.
1459    ///
1460    /// * `definition` - The filter definition that should be uploaded to the
1461    /// server if no filter ID can be found in the store.
1462    ///
1463    /// # Examples
1464    ///
1465    /// ```no_run
1466    /// # use matrix_sdk::{
1467    /// #    Client, config::SyncSettings,
1468    /// #    ruma::api::client::{
1469    /// #        filter::{
1470    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1471    /// #        },
1472    /// #        sync::sync_events::v3::Filter,
1473    /// #    }
1474    /// # };
1475    /// # use url::Url;
1476    /// # async {
1477    /// # let homeserver = Url::parse("http://example.com").unwrap();
1478    /// # let client = Client::new(homeserver).await.unwrap();
1479    /// let mut filter = FilterDefinition::default();
1480    ///
1481    /// // Let's enable member lazy loading.
1482    /// filter.room.state.lazy_load_options =
1483    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1484    ///
1485    /// let filter_id = client
1486    ///     .get_or_upload_filter("sync", filter)
1487    ///     .await
1488    ///     .unwrap();
1489    ///
1490    /// let sync_settings = SyncSettings::new()
1491    ///     .filter(Filter::FilterId(filter_id));
1492    ///
1493    /// let response = client.sync_once(sync_settings).await.unwrap();
1494    /// # };
1495    #[instrument(skip(self, definition))]
1496    pub async fn get_or_upload_filter(
1497        &self,
1498        filter_name: &str,
1499        definition: FilterDefinition,
1500    ) -> Result<String> {
1501        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1502            debug!("Found filter locally");
1503            Ok(filter)
1504        } else {
1505            debug!("Didn't find filter locally");
1506            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1507            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1508            let response = self.send(request).await?;
1509
1510            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1511
1512            Ok(response.filter_id)
1513        }
1514    }
1515
1516    /// Prepare to join a room by ID, by getting the current details about it
1517    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1518        let room = self.get_room(room_id)?;
1519
1520        let inviter = match room.invite_details().await {
1521            Ok(details) => details.inviter,
1522            Err(Error::WrongRoomState(_)) => None,
1523            Err(e) => {
1524                warn!("Error fetching invite details for room: {e:?}");
1525                None
1526            }
1527        };
1528
1529        Some(PreJoinRoomInfo { inviter })
1530    }
1531
1532    /// Finish joining a room.
1533    ///
1534    /// If the room was an invite that should be marked as a DM, will include it
1535    /// in the DM event after creating the joined room.
1536    ///
1537    /// If encrypted history sharing is enabled, will check to see if we have a
1538    /// key bundle, and import it if so.
1539    ///
1540    /// # Arguments
1541    ///
1542    /// * `room_id` - The `RoomId` of the room that was joined.
1543    /// * `pre_join_room_info` - Information about the room before we joined.
1544    async fn finish_join_room(
1545        &self,
1546        room_id: &RoomId,
1547        pre_join_room_info: Option<PreJoinRoomInfo>,
1548    ) -> Result<Room> {
1549        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1550            room.state() == RoomState::Invited
1551                && room.is_direct().await.unwrap_or_else(|e| {
1552                    warn!(%room_id, "is_direct() failed: {e}");
1553                    false
1554                })
1555        } else {
1556            false
1557        };
1558
1559        let base_room = self
1560            .base_client()
1561            .room_joined(
1562                room_id,
1563                pre_join_room_info
1564                    .as_ref()
1565                    .and_then(|info| info.inviter.as_ref())
1566                    .map(|i| i.user_id().to_owned()),
1567            )
1568            .await?;
1569        let room = Room::new(self.clone(), base_room);
1570
1571        if mark_as_dm {
1572            room.set_is_direct(true).await?;
1573        }
1574
1575        #[cfg(feature = "e2e-encryption")]
1576        if self.inner.enable_share_history_on_invite
1577            && let Some(inviter) =
1578                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1579        {
1580            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1581                .await?;
1582        }
1583
1584        // Suppress "unused variable" and "unused field" lints
1585        #[cfg(not(feature = "e2e-encryption"))]
1586        let _ = pre_join_room_info.map(|i| i.inviter);
1587
1588        Ok(room)
1589    }
1590
1591    /// Join a room by `RoomId`.
1592    ///
1593    /// Returns the `Room` in the joined state.
1594    ///
1595    /// # Arguments
1596    ///
1597    /// * `room_id` - The `RoomId` of the room to be joined.
1598    #[instrument(skip(self))]
1599    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1600        // See who invited us to this room, if anyone. Note we have to do this before
1601        // making the `/join` request, otherwise we could race against the sync.
1602        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1603
1604        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1605        let response = self.send(request).await?;
1606        self.finish_join_room(&response.room_id, pre_join_info).await
1607    }
1608
1609    /// Join a room by `RoomOrAliasId`.
1610    ///
1611    /// Returns the `Room` in the joined state.
1612    ///
1613    /// # Arguments
1614    ///
1615    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1616    ///   alias looks like `#name:example.com`.
1617    /// * `server_names` - The server names to be used for resolving the alias,
1618    ///   if needs be.
1619    pub async fn join_room_by_id_or_alias(
1620        &self,
1621        alias: &RoomOrAliasId,
1622        server_names: &[OwnedServerName],
1623    ) -> Result<Room> {
1624        let pre_join_info = {
1625            match alias.try_into() {
1626                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1627                Err(_) => {
1628                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1629                    // responding to an invitation to the room, and therefore don't need to handle
1630                    // things that happen as a result of invites.
1631                    None
1632                }
1633            }
1634        };
1635        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1636            via: server_names.to_owned(),
1637        });
1638        let response = self.send(request).await?;
1639        self.finish_join_room(&response.room_id, pre_join_info).await
1640    }
1641
1642    /// Search the homeserver's directory of public rooms.
1643    ///
1644    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1645    /// a `get_public_rooms::Response`.
1646    ///
1647    /// # Arguments
1648    ///
1649    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1650    ///
1651    /// * `since` - Pagination token from a previous request.
1652    ///
1653    /// * `server` - The name of the server, if `None` the requested server is
1654    ///   used.
1655    ///
1656    /// # Examples
1657    /// ```no_run
1658    /// use matrix_sdk::Client;
1659    /// # use url::Url;
1660    /// # let homeserver = Url::parse("http://example.com").unwrap();
1661    /// # let limit = Some(10);
1662    /// # let since = Some("since token");
1663    /// # let server = Some("servername.com".try_into().unwrap());
1664    /// # async {
1665    /// let mut client = Client::new(homeserver).await.unwrap();
1666    ///
1667    /// client.public_rooms(limit, since, server).await;
1668    /// # };
1669    /// ```
1670    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1671    pub async fn public_rooms(
1672        &self,
1673        limit: Option<u32>,
1674        since: Option<&str>,
1675        server: Option<&ServerName>,
1676    ) -> HttpResult<get_public_rooms::v3::Response> {
1677        let limit = limit.map(UInt::from);
1678
1679        let request = assign!(get_public_rooms::v3::Request::new(), {
1680            limit,
1681            since: since.map(ToOwned::to_owned),
1682            server: server.map(ToOwned::to_owned),
1683        });
1684        self.send(request).await
1685    }
1686
1687    /// Create a room with the given parameters.
1688    ///
1689    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1690    /// created room.
1691    ///
1692    /// If you want to create a direct message with one specific user, you can
1693    /// use [`create_dm`][Self::create_dm], which is more convenient than
1694    /// assembling the [`create_room::v3::Request`] yourself.
1695    ///
1696    /// If the `is_direct` field of the request is set to `true` and at least
1697    /// one user is invited, the room will be automatically added to the direct
1698    /// rooms in the account data.
1699    ///
1700    /// # Examples
1701    ///
1702    /// ```no_run
1703    /// use matrix_sdk::{
1704    ///     Client,
1705    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1706    /// };
1707    /// # use url::Url;
1708    /// #
1709    /// # async {
1710    /// # let homeserver = Url::parse("http://example.com").unwrap();
1711    /// let request = CreateRoomRequest::new();
1712    /// let client = Client::new(homeserver).await.unwrap();
1713    /// assert!(client.create_room(request).await.is_ok());
1714    /// # };
1715    /// ```
1716    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1717        let invite = request.invite.clone();
1718        let is_direct_room = request.is_direct;
1719        let response = self.send(request).await?;
1720
1721        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1722
1723        let joined_room = Room::new(self.clone(), base_room);
1724
1725        if is_direct_room
1726            && !invite.is_empty()
1727            && let Err(error) =
1728                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1729        {
1730            // FIXME: Retry in the background
1731            error!("Failed to mark room as DM: {error}");
1732        }
1733
1734        Ok(joined_room)
1735    }
1736
1737    /// Create a DM room.
1738    ///
1739    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1740    /// given user being invited, the room marked `is_direct` and both the
1741    /// creator and invitee getting the default maximum power level.
1742    ///
1743    /// If the `e2e-encryption` feature is enabled, the room will also be
1744    /// encrypted.
1745    ///
1746    /// # Arguments
1747    ///
1748    /// * `user_id` - The ID of the user to create a DM for.
1749    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1750        #[cfg(feature = "e2e-encryption")]
1751        let initial_state = vec![
1752            InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1753                .to_raw_any(),
1754        ];
1755
1756        #[cfg(not(feature = "e2e-encryption"))]
1757        let initial_state = vec![];
1758
1759        let request = assign!(create_room::v3::Request::new(), {
1760            invite: vec![user_id.to_owned()],
1761            is_direct: true,
1762            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1763            initial_state,
1764        });
1765
1766        self.create_room(request).await
1767    }
1768
1769    /// Search the homeserver's directory for public rooms with a filter.
1770    ///
1771    /// # Arguments
1772    ///
1773    /// * `room_search` - The easiest way to create this request is using the
1774    ///   `get_public_rooms_filtered::Request` itself.
1775    ///
1776    /// # Examples
1777    ///
1778    /// ```no_run
1779    /// # use url::Url;
1780    /// # use matrix_sdk::Client;
1781    /// # async {
1782    /// # let homeserver = Url::parse("http://example.com")?;
1783    /// use matrix_sdk::ruma::{
1784    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1785    /// };
1786    /// # let mut client = Client::new(homeserver).await?;
1787    ///
1788    /// let mut filter = Filter::new();
1789    /// filter.generic_search_term = Some("rust".to_owned());
1790    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1791    /// request.filter = filter;
1792    ///
1793    /// let response = client.public_rooms_filtered(request).await?;
1794    ///
1795    /// for room in response.chunk {
1796    ///     println!("Found room {room:?}");
1797    /// }
1798    /// # anyhow::Ok(()) };
1799    /// ```
1800    pub async fn public_rooms_filtered(
1801        &self,
1802        request: get_public_rooms_filtered::v3::Request,
1803    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1804        self.send(request).await
1805    }
1806
1807    /// Send an arbitrary request to the server, without updating client state.
1808    ///
1809    /// **Warning:** Because this method *does not* update the client state, it
1810    /// is important to make sure that you account for this yourself, and
1811    /// use wrapper methods where available.  This method should *only* be
1812    /// used if a wrapper method for the endpoint you'd like to use is not
1813    /// available.
1814    ///
1815    /// # Arguments
1816    ///
1817    /// * `request` - A filled out and valid request for the endpoint to be hit
1818    ///
1819    /// * `timeout` - An optional request timeout setting, this overrides the
1820    ///   default request setting if one was set.
1821    ///
1822    /// # Examples
1823    ///
1824    /// ```no_run
1825    /// # use matrix_sdk::{Client, config::SyncSettings};
1826    /// # use url::Url;
1827    /// # async {
1828    /// # let homeserver = Url::parse("http://localhost:8080")?;
1829    /// # let mut client = Client::new(homeserver).await?;
1830    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1831    ///
1832    /// // First construct the request you want to make
1833    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1834    /// // for all available Endpoints
1835    /// let user_id = user_id!("@example:localhost").to_owned();
1836    /// let request = profile::get_profile::v3::Request::new(user_id);
1837    ///
1838    /// // Start the request using Client::send()
1839    /// let response = client.send(request).await?;
1840    ///
1841    /// // Check the corresponding Response struct to find out what types are
1842    /// // returned
1843    /// # anyhow::Ok(()) };
1844    /// ```
1845    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1846    where
1847        Request: OutgoingRequest + Clone + Debug,
1848        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1849    {
1850        SendRequest {
1851            client: self.clone(),
1852            request,
1853            config: None,
1854            send_progress: Default::default(),
1855        }
1856    }
1857
1858    pub(crate) async fn send_inner<Request>(
1859        &self,
1860        request: Request,
1861        config: Option<RequestConfig>,
1862        send_progress: SharedObservable<TransmissionProgress>,
1863    ) -> HttpResult<Request::IncomingResponse>
1864    where
1865        Request: OutgoingRequest + Debug,
1866        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1867    {
1868        let homeserver = self.homeserver().to_string();
1869        let access_token = self.access_token();
1870
1871        self.inner
1872            .http_client
1873            .send(
1874                request,
1875                config,
1876                homeserver,
1877                access_token.as_deref(),
1878                &self.supported_versions().await?,
1879                send_progress,
1880            )
1881            .await
1882    }
1883
1884    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1885        _ = self
1886            .inner
1887            .auth_ctx
1888            .session_change_sender
1889            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1890    }
1891
1892    /// Fetches server versions from network; no caching.
1893    pub async fn fetch_server_versions(
1894        &self,
1895        request_config: Option<RequestConfig>,
1896    ) -> HttpResult<get_supported_versions::Response> {
1897        let server_versions = self
1898            .inner
1899            .http_client
1900            .send(
1901                get_supported_versions::Request::new(),
1902                request_config,
1903                self.homeserver().to_string(),
1904                None,
1905                &SupportedVersions {
1906                    versions: [MatrixVersion::V1_0].into(),
1907                    features: Default::default(),
1908                },
1909                Default::default(),
1910            )
1911            .await?;
1912
1913        Ok(server_versions)
1914    }
1915
1916    /// Fetches client well_known from network; no caching.
1917    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1918        let server_url_string = self
1919            .server()
1920            .unwrap_or(
1921                // Sometimes people configure their well-known directly on the homeserver so use
1922                // this as a fallback when the server name is unknown.
1923                &self.homeserver(),
1924            )
1925            .to_string();
1926
1927        let well_known = self
1928            .inner
1929            .http_client
1930            .send(
1931                discover_homeserver::Request::new(),
1932                Some(RequestConfig::short_retry()),
1933                server_url_string,
1934                None,
1935                &SupportedVersions {
1936                    versions: [MatrixVersion::V1_0].into(),
1937                    features: Default::default(),
1938                },
1939                Default::default(),
1940            )
1941            .await;
1942
1943        match well_known {
1944            Ok(well_known) => Some(well_known),
1945            Err(http_error) => {
1946                // It is perfectly valid to not have a well-known file.
1947                // Maybe we should check for a specific error code to be sure?
1948                warn!("Failed to fetch client well-known: {http_error}");
1949                None
1950            }
1951        }
1952    }
1953
1954    /// Load server info from storage, or fetch them from network and cache
1955    /// them.
1956    async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1957        match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1958            Ok(Some(stored)) => {
1959                if let Some(server_info) =
1960                    stored.into_server_info().and_then(|info| info.maybe_decode())
1961                {
1962                    return Ok(server_info);
1963                }
1964            }
1965            Ok(None) => {
1966                // fallthrough: cache is empty
1967            }
1968            Err(err) => {
1969                warn!("error when loading cached server info: {err}");
1970                // fallthrough to network.
1971            }
1972        }
1973
1974        let server_versions = self.fetch_server_versions(None).await?;
1975        let well_known = self.fetch_client_well_known().await;
1976        let server_info = ServerInfo::new(
1977            server_versions.versions.clone(),
1978            server_versions.unstable_features.clone(),
1979            well_known.map(Into::into),
1980        );
1981
1982        // Attempt to cache the result in storage.
1983        {
1984            if let Err(err) = self
1985                .state_store()
1986                .set_kv_data(
1987                    StateStoreDataKey::ServerInfo,
1988                    StateStoreDataValue::ServerInfo(server_info.clone()),
1989                )
1990                .await
1991            {
1992                warn!("error when caching server info: {err}");
1993            }
1994        }
1995
1996        Ok(server_info)
1997    }
1998
1999    async fn get_or_load_and_cache_server_info<
2000        Value,
2001        MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
2002    >(
2003        &self,
2004        map: MapFunction,
2005    ) -> HttpResult<Value> {
2006        let server_info = &self.inner.caches.server_info;
2007        if let CachedValue::Cached(val) = map(&*server_info.read().await) {
2008            return Ok(val);
2009        }
2010
2011        let mut guarded_server_info = server_info.write().await;
2012        if let CachedValue::Cached(val) = map(&guarded_server_info) {
2013            return Ok(val);
2014        }
2015
2016        let server_info = self.load_or_fetch_server_info().await?;
2017
2018        // Fill both unstable features and server versions at once.
2019        let mut supported = server_info.supported_versions();
2020        if supported.versions.is_empty() {
2021            supported.versions = [MatrixVersion::V1_0].into();
2022        }
2023
2024        guarded_server_info.supported_versions = CachedValue::Cached(supported);
2025        guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2026
2027        // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2028        // fetch an optional property), the function will always return some.
2029        Ok(map(&guarded_server_info).unwrap_cached_value())
2030    }
2031
2032    /// Get the Matrix versions and features supported by the homeserver by
2033    /// fetching them from the server or the cache.
2034    ///
2035    /// This is equivalent to calling both [`Client::server_versions()`] and
2036    /// [`Client::unstable_features()`]. To always fetch the result from the
2037    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2038    /// and then `.as_supported_versions()` on the response.
2039    ///
2040    /// # Examples
2041    ///
2042    /// ```no_run
2043    /// use ruma::api::{FeatureFlag, MatrixVersion};
2044    /// # use matrix_sdk::{Client, config::SyncSettings};
2045    /// # use url::Url;
2046    /// # async {
2047    /// # let homeserver = Url::parse("http://localhost:8080")?;
2048    /// # let mut client = Client::new(homeserver).await?;
2049    ///
2050    /// let supported = client.supported_versions().await?;
2051    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2052    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2053    ///
2054    /// let msc_x_feature = FeatureFlag::from("msc_x");
2055    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2056    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2057    /// # anyhow::Ok(()) };
2058    /// ```
2059    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2060        self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2061            .await
2062    }
2063
2064    /// Get the Matrix versions supported by the homeserver by fetching them
2065    /// from the server or the cache.
2066    ///
2067    /// # Examples
2068    ///
2069    /// ```no_run
2070    /// use ruma::api::MatrixVersion;
2071    /// # use matrix_sdk::{Client, config::SyncSettings};
2072    /// # use url::Url;
2073    /// # async {
2074    /// # let homeserver = Url::parse("http://localhost:8080")?;
2075    /// # let mut client = Client::new(homeserver).await?;
2076    ///
2077    /// let server_versions = client.server_versions().await?;
2078    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2079    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2080    /// # anyhow::Ok(()) };
2081    /// ```
2082    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2083        self.get_or_load_and_cache_server_info(|server_info| {
2084            server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2085        })
2086        .await
2087    }
2088
2089    /// Get the unstable features supported by the homeserver by fetching them
2090    /// from the server or the cache.
2091    ///
2092    /// # Examples
2093    ///
2094    /// ```no_run
2095    /// use matrix_sdk::ruma::api::FeatureFlag;
2096    /// # use matrix_sdk::{Client, config::SyncSettings};
2097    /// # use url::Url;
2098    /// # async {
2099    /// # let homeserver = Url::parse("http://localhost:8080")?;
2100    /// # let mut client = Client::new(homeserver).await?;
2101    ///
2102    /// let msc_x_feature = FeatureFlag::from("msc_x");
2103    /// let unstable_features = client.unstable_features().await?;
2104    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2105    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2106    /// # anyhow::Ok(()) };
2107    /// ```
2108    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2109        self.get_or_load_and_cache_server_info(|server_info| {
2110            server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2111        })
2112        .await
2113    }
2114
2115    /// Get information about the homeserver's advertised RTC foci by fetching
2116    /// the well-known file from the server or the cache.
2117    ///
2118    /// # Examples
2119    /// ```no_run
2120    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2121    /// # use url::Url;
2122    /// # async {
2123    /// # let homeserver = Url::parse("http://localhost:8080")?;
2124    /// # let mut client = Client::new(homeserver).await?;
2125    /// let rtc_foci = client.rtc_foci().await?;
2126    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2127    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2128    ///     _ => None,
2129    /// });
2130    /// if let Some(info) = default_livekit_focus_info {
2131    ///     println!("Default LiveKit service URL: {}", info.service_url);
2132    /// }
2133    /// # anyhow::Ok(()) };
2134    /// ```
2135    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2136        let well_known = self
2137            .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2138            .await?;
2139
2140        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2141    }
2142
2143    /// Empty the server version and unstable features cache.
2144    ///
2145    /// Since the SDK caches server info (versions, unstable features,
2146    /// well-known etc), it's possible to have a stale entry in the cache. This
2147    /// functions makes it possible to force reset it.
2148    pub async fn reset_server_info(&self) -> Result<()> {
2149        // Empty the in-memory caches.
2150        let mut guard = self.inner.caches.server_info.write().await;
2151        guard.supported_versions = CachedValue::NotSet;
2152
2153        // Empty the store cache.
2154        Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2155    }
2156
2157    /// Check whether MSC 4028 is enabled on the homeserver.
2158    ///
2159    /// # Examples
2160    ///
2161    /// ```no_run
2162    /// # use matrix_sdk::{Client, config::SyncSettings};
2163    /// # use url::Url;
2164    /// # async {
2165    /// # let homeserver = Url::parse("http://localhost:8080")?;
2166    /// # let mut client = Client::new(homeserver).await?;
2167    /// let msc4028_enabled =
2168    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2169    /// # anyhow::Ok(()) };
2170    /// ```
2171    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2172        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2173    }
2174
2175    /// Get information of all our own devices.
2176    ///
2177    /// # Examples
2178    ///
2179    /// ```no_run
2180    /// # use matrix_sdk::{Client, config::SyncSettings};
2181    /// # use url::Url;
2182    /// # async {
2183    /// # let homeserver = Url::parse("http://localhost:8080")?;
2184    /// # let mut client = Client::new(homeserver).await?;
2185    /// let response = client.devices().await?;
2186    ///
2187    /// for device in response.devices {
2188    ///     println!(
2189    ///         "Device: {} {}",
2190    ///         device.device_id,
2191    ///         device.display_name.as_deref().unwrap_or("")
2192    ///     );
2193    /// }
2194    /// # anyhow::Ok(()) };
2195    /// ```
2196    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2197        let request = get_devices::v3::Request::new();
2198
2199        self.send(request).await
2200    }
2201
2202    /// Delete the given devices from the server.
2203    ///
2204    /// # Arguments
2205    ///
2206    /// * `devices` - The list of devices that should be deleted from the
2207    ///   server.
2208    ///
2209    /// * `auth_data` - This request requires user interactive auth, the first
2210    ///   request needs to set this to `None` and will always fail with an
2211    ///   `UiaaResponse`. The response will contain information for the
2212    ///   interactive auth and the same request needs to be made but this time
2213    ///   with some `auth_data` provided.
2214    ///
2215    /// ```no_run
2216    /// # use matrix_sdk::{
2217    /// #    ruma::{api::client::uiaa, device_id},
2218    /// #    Client, Error, config::SyncSettings,
2219    /// # };
2220    /// # use serde_json::json;
2221    /// # use url::Url;
2222    /// # use std::collections::BTreeMap;
2223    /// # async {
2224    /// # let homeserver = Url::parse("http://localhost:8080")?;
2225    /// # let mut client = Client::new(homeserver).await?;
2226    /// let devices = &[device_id!("DEVICEID").to_owned()];
2227    ///
2228    /// if let Err(e) = client.delete_devices(devices, None).await {
2229    ///     if let Some(info) = e.as_uiaa_response() {
2230    ///         let mut password = uiaa::Password::new(
2231    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2232    ///             "wordpass".to_owned(),
2233    ///         );
2234    ///         password.session = info.session.clone();
2235    ///
2236    ///         client
2237    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2238    ///             .await?;
2239    ///     }
2240    /// }
2241    /// # anyhow::Ok(()) };
2242    pub async fn delete_devices(
2243        &self,
2244        devices: &[OwnedDeviceId],
2245        auth_data: Option<uiaa::AuthData>,
2246    ) -> HttpResult<delete_devices::v3::Response> {
2247        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2248        request.auth = auth_data;
2249
2250        self.send(request).await
2251    }
2252
2253    /// Change the display name of a device owned by the current user.
2254    ///
2255    /// Returns a `update_device::Response` which specifies the result
2256    /// of the operation.
2257    ///
2258    /// # Arguments
2259    ///
2260    /// * `device_id` - The ID of the device to change the display name of.
2261    /// * `display_name` - The new display name to set.
2262    pub async fn rename_device(
2263        &self,
2264        device_id: &DeviceId,
2265        display_name: &str,
2266    ) -> HttpResult<update_device::v3::Response> {
2267        let mut request = update_device::v3::Request::new(device_id.to_owned());
2268        request.display_name = Some(display_name.to_owned());
2269
2270        self.send(request).await
2271    }
2272
2273    /// Synchronize the client's state with the latest state on the server.
2274    ///
2275    /// ## Syncing Events
2276    ///
2277    /// Messages or any other type of event need to be periodically fetched from
2278    /// the server, this is achieved by sending a `/sync` request to the server.
2279    ///
2280    /// The first sync is sent out without a [`token`]. The response of the
2281    /// first sync will contain a [`next_batch`] field which should then be
2282    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2283    /// don't receive the same events multiple times.
2284    ///
2285    /// ## Long Polling
2286    ///
2287    /// A sync should in the usual case always be in flight. The
2288    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2289    /// long the server will wait for new events before it will respond.
2290    /// The server will respond immediately if some new events arrive before the
2291    /// timeout has expired. If no changes arrive and the timeout expires an
2292    /// empty sync response will be sent to the client.
2293    ///
2294    /// This method of sending a request that may not receive a response
2295    /// immediately is called long polling.
2296    ///
2297    /// ## Filtering Events
2298    ///
2299    /// The number or type of messages and events that the client should receive
2300    /// from the server can be altered using a [`Filter`].
2301    ///
2302    /// Filters can be non-trivial and, since they will be sent with every sync
2303    /// request, they may take up a bunch of unnecessary bandwidth.
2304    ///
2305    /// Luckily filters can be uploaded to the server and reused using an unique
2306    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2307    /// method.
2308    ///
2309    /// # Arguments
2310    ///
2311    /// * `sync_settings` - Settings for the sync call, this allows us to set
2312    /// various options to configure the sync:
2313    ///     * [`filter`] - To configure which events we receive and which get
2314    ///       [filtered] by the server
2315    ///     * [`timeout`] - To configure our [long polling] setup.
2316    ///     * [`token`] - To tell the server which events we already received
2317    ///       and where we wish to continue syncing.
2318    ///     * [`full_state`] - To tell the server that we wish to receive all
2319    ///       state events, regardless of our configured [`token`].
2320    ///     * [`set_presence`] - To tell the server to set the presence and to
2321    ///       which state.
2322    ///
2323    /// # Examples
2324    ///
2325    /// ```no_run
2326    /// # use url::Url;
2327    /// # async {
2328    /// # let homeserver = Url::parse("http://localhost:8080")?;
2329    /// # let username = "";
2330    /// # let password = "";
2331    /// use matrix_sdk::{
2332    ///     Client, config::SyncSettings,
2333    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2334    /// };
2335    ///
2336    /// let client = Client::new(homeserver).await?;
2337    /// client.matrix_auth().login_username(username, password).send().await?;
2338    ///
2339    /// // Sync once so we receive the client state and old messages.
2340    /// client.sync_once(SyncSettings::default()).await?;
2341    ///
2342    /// // Register our handler so we start responding once we receive a new
2343    /// // event.
2344    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2345    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2346    /// });
2347    ///
2348    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2349    /// // from our `sync_once()` call automatically.
2350    /// client.sync(SyncSettings::default()).await;
2351    /// # anyhow::Ok(()) };
2352    /// ```
2353    ///
2354    /// [`sync`]: #method.sync
2355    /// [`SyncSettings`]: crate::config::SyncSettings
2356    /// [`token`]: crate::config::SyncSettings#method.token
2357    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2358    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2359    /// [`set_presence`]: ruma::presence::PresenceState
2360    /// [`filter`]: crate::config::SyncSettings#method.filter
2361    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2362    /// [`next_batch`]: SyncResponse#structfield.next_batch
2363    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2364    /// [long polling]: #long-polling
2365    /// [filtered]: #filtering-events
2366    #[instrument(skip(self))]
2367    pub async fn sync_once(
2368        &self,
2369        sync_settings: crate::config::SyncSettings,
2370    ) -> Result<SyncResponse> {
2371        // The sync might not return for quite a while due to the timeout.
2372        // We'll see if there's anything crypto related to send out before we
2373        // sync, i.e. if we closed our client after a sync but before the
2374        // crypto requests were sent out.
2375        //
2376        // This will mostly be a no-op.
2377        #[cfg(feature = "e2e-encryption")]
2378        if let Err(e) = self.send_outgoing_requests().await {
2379            error!(error = ?e, "Error while sending outgoing E2EE requests");
2380        }
2381
2382        let token = match sync_settings.token {
2383            SyncToken::Specific(token) => Some(token),
2384            SyncToken::NoToken => None,
2385            SyncToken::ReusePrevious => self.sync_token().await,
2386        };
2387
2388        let request = assign!(sync_events::v3::Request::new(), {
2389            filter: sync_settings.filter.map(|f| *f),
2390            since: token,
2391            full_state: sync_settings.full_state,
2392            set_presence: sync_settings.set_presence,
2393            timeout: sync_settings.timeout,
2394            use_state_after: true,
2395        });
2396        let mut request_config = self.request_config();
2397        if let Some(timeout) = sync_settings.timeout {
2398            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2399            request_config.timeout = Some(base_timeout + timeout);
2400        }
2401
2402        let response = self.send(request).with_request_config(request_config).await?;
2403        let next_batch = response.next_batch.clone();
2404        let response = self.process_sync(response).await?;
2405
2406        #[cfg(feature = "e2e-encryption")]
2407        if let Err(e) = self.send_outgoing_requests().await {
2408            error!(error = ?e, "Error while sending outgoing E2EE requests");
2409        }
2410
2411        self.inner.sync_beat.notify(usize::MAX);
2412
2413        Ok(SyncResponse::new(next_batch, response))
2414    }
2415
2416    /// Repeatedly synchronize the client state with the server.
2417    ///
2418    /// This method will only return on error, if cancellation is needed
2419    /// the method should be wrapped in a cancelable task or the
2420    /// [`Client::sync_with_callback`] method can be used or
2421    /// [`Client::sync_with_result_callback`] if you want to handle error
2422    /// cases in the loop, too.
2423    ///
2424    /// This method will internally call [`Client::sync_once`] in a loop.
2425    ///
2426    /// This method can be used with the [`Client::add_event_handler`]
2427    /// method to react to individual events. If you instead wish to handle
2428    /// events in a bulk manner the [`Client::sync_with_callback`],
2429    /// [`Client::sync_with_result_callback`] and
2430    /// [`Client::sync_stream`] methods can be used instead. Those methods
2431    /// repeatedly return the whole sync response.
2432    ///
2433    /// # Arguments
2434    ///
2435    /// * `sync_settings` - Settings for the sync call. *Note* that those
2436    ///   settings will be only used for the first sync call. See the argument
2437    ///   docs for [`Client::sync_once`] for more info.
2438    ///
2439    /// # Return
2440    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2441    /// up to the user of the API to check the error and decide whether the sync
2442    /// should continue or not.
2443    ///
2444    /// # Examples
2445    ///
2446    /// ```no_run
2447    /// # use url::Url;
2448    /// # async {
2449    /// # let homeserver = Url::parse("http://localhost:8080")?;
2450    /// # let username = "";
2451    /// # let password = "";
2452    /// use matrix_sdk::{
2453    ///     Client, config::SyncSettings,
2454    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2455    /// };
2456    ///
2457    /// let client = Client::new(homeserver).await?;
2458    /// client.matrix_auth().login_username(&username, &password).send().await?;
2459    ///
2460    /// // Register our handler so we start responding once we receive a new
2461    /// // event.
2462    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2463    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2464    /// });
2465    ///
2466    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2467    /// // automatically.
2468    /// client.sync(SyncSettings::default()).await?;
2469    /// # anyhow::Ok(()) };
2470    /// ```
2471    ///
2472    /// [argument docs]: #method.sync_once
2473    /// [`sync_with_callback`]: #method.sync_with_callback
2474    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2475        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2476    }
2477
2478    /// Repeatedly call sync to synchronize the client state with the server.
2479    ///
2480    /// # Arguments
2481    ///
2482    /// * `sync_settings` - Settings for the sync call. *Note* that those
2483    ///   settings will be only used for the first sync call. See the argument
2484    ///   docs for [`Client::sync_once`] for more info.
2485    ///
2486    /// * `callback` - A callback that will be called every time a successful
2487    ///   response has been fetched from the server. The callback must return a
2488    ///   boolean which signalizes if the method should stop syncing. If the
2489    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2490    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2491    ///
2492    /// # Return
2493    /// The sync runs until an error occurs or the
2494    /// callback indicates that the Loop should stop. If the callback asked for
2495    /// a regular stop, the result will be `Ok(())` otherwise the
2496    /// `Err(Error)` is returned.
2497    ///
2498    /// # Examples
2499    ///
2500    /// The following example demonstrates how to sync forever while sending all
2501    /// the interesting events through a mpsc channel to another thread e.g. a
2502    /// UI thread.
2503    ///
2504    /// ```no_run
2505    /// # use std::time::Duration;
2506    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2507    /// # use url::Url;
2508    /// # async {
2509    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2510    /// # let mut client = Client::new(homeserver).await.unwrap();
2511    ///
2512    /// use tokio::sync::mpsc::channel;
2513    ///
2514    /// let (tx, rx) = channel(100);
2515    ///
2516    /// let sync_channel = &tx;
2517    /// let sync_settings = SyncSettings::new()
2518    ///     .timeout(Duration::from_secs(30));
2519    ///
2520    /// client
2521    ///     .sync_with_callback(sync_settings, |response| async move {
2522    ///         let channel = sync_channel;
2523    ///         for (room_id, room) in response.rooms.joined {
2524    ///             for event in room.timeline.events {
2525    ///                 channel.send(event).await.unwrap();
2526    ///             }
2527    ///         }
2528    ///
2529    ///         LoopCtrl::Continue
2530    ///     })
2531    ///     .await;
2532    /// };
2533    /// ```
2534    #[instrument(skip_all)]
2535    pub async fn sync_with_callback<C>(
2536        &self,
2537        sync_settings: crate::config::SyncSettings,
2538        callback: impl Fn(SyncResponse) -> C,
2539    ) -> Result<(), Error>
2540    where
2541        C: Future<Output = LoopCtrl>,
2542    {
2543        self.sync_with_result_callback(sync_settings, |result| async {
2544            Ok(callback(result?).await)
2545        })
2546        .await
2547    }
2548
2549    /// Repeatedly call sync to synchronize the client state with the server.
2550    ///
2551    /// # Arguments
2552    ///
2553    /// * `sync_settings` - Settings for the sync call. *Note* that those
2554    ///   settings will be only used for the first sync call. See the argument
2555    ///   docs for [`Client::sync_once`] for more info.
2556    ///
2557    /// * `callback` - A callback that will be called every time after a
2558    ///   response has been received, failure or not. The callback returns a
2559    ///   `Result<LoopCtrl, Error>`, too. When returning
2560    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2561    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2562    ///   function returns `Ok(())`. In case the callback can't handle the
2563    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2564    ///   which results in the sync ending and the `Err(Error)` being returned.
2565    ///
2566    /// # Return
2567    /// The sync runs until an error occurs that the callback can't handle or
2568    /// the callback indicates that the Loop should stop. If the callback
2569    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2570    /// `Err(Error)` is returned.
2571    ///
2572    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2573    /// this, and are handled first without sending the result to the
2574    /// callback. Only after they have exceeded is the `Result` handed to
2575    /// the callback.
2576    ///
2577    /// # Examples
2578    ///
2579    /// The following example demonstrates how to sync forever while sending all
2580    /// the interesting events through a mpsc channel to another thread e.g. a
2581    /// UI thread.
2582    ///
2583    /// ```no_run
2584    /// # use std::time::Duration;
2585    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2586    /// # use url::Url;
2587    /// # async {
2588    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2589    /// # let mut client = Client::new(homeserver).await.unwrap();
2590    /// #
2591    /// use tokio::sync::mpsc::channel;
2592    ///
2593    /// let (tx, rx) = channel(100);
2594    ///
2595    /// let sync_channel = &tx;
2596    /// let sync_settings = SyncSettings::new()
2597    ///     .timeout(Duration::from_secs(30));
2598    ///
2599    /// client
2600    ///     .sync_with_result_callback(sync_settings, |response| async move {
2601    ///         let channel = sync_channel;
2602    ///         let sync_response = response?;
2603    ///         for (room_id, room) in sync_response.rooms.joined {
2604    ///              for event in room.timeline.events {
2605    ///                  channel.send(event).await.unwrap();
2606    ///               }
2607    ///         }
2608    ///
2609    ///         Ok(LoopCtrl::Continue)
2610    ///     })
2611    ///     .await;
2612    /// };
2613    /// ```
2614    #[instrument(skip(self, callback))]
2615    pub async fn sync_with_result_callback<C>(
2616        &self,
2617        sync_settings: crate::config::SyncSettings,
2618        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2619    ) -> Result<(), Error>
2620    where
2621        C: Future<Output = Result<LoopCtrl, Error>>,
2622    {
2623        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2624
2625        while let Some(result) = sync_stream.next().await {
2626            trace!("Running callback");
2627            if callback(result).await? == LoopCtrl::Break {
2628                trace!("Callback told us to stop");
2629                break;
2630            }
2631            trace!("Done running callback");
2632        }
2633
2634        Ok(())
2635    }
2636
2637    //// Repeatedly synchronize the client state with the server.
2638    ///
2639    /// This method will internally call [`Client::sync_once`] in a loop and is
2640    /// equivalent to the [`Client::sync`] method but the responses are provided
2641    /// as an async stream.
2642    ///
2643    /// # Arguments
2644    ///
2645    /// * `sync_settings` - Settings for the sync call. *Note* that those
2646    ///   settings will be only used for the first sync call. See the argument
2647    ///   docs for [`Client::sync_once`] for more info.
2648    ///
2649    /// # Examples
2650    ///
2651    /// ```no_run
2652    /// # use url::Url;
2653    /// # async {
2654    /// # let homeserver = Url::parse("http://localhost:8080")?;
2655    /// # let username = "";
2656    /// # let password = "";
2657    /// use futures_util::StreamExt;
2658    /// use matrix_sdk::{Client, config::SyncSettings};
2659    ///
2660    /// let client = Client::new(homeserver).await?;
2661    /// client.matrix_auth().login_username(&username, &password).send().await?;
2662    ///
2663    /// let mut sync_stream =
2664    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2665    ///
2666    /// while let Some(Ok(response)) = sync_stream.next().await {
2667    ///     for room in response.rooms.joined.values() {
2668    ///         for e in &room.timeline.events {
2669    ///             if let Ok(event) = e.raw().deserialize() {
2670    ///                 println!("Received event {:?}", event);
2671    ///             }
2672    ///         }
2673    ///     }
2674    /// }
2675    ///
2676    /// # anyhow::Ok(()) };
2677    /// ```
2678    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2679    #[instrument(skip(self))]
2680    pub async fn sync_stream(
2681        &self,
2682        mut sync_settings: crate::config::SyncSettings,
2683    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2684        let mut is_first_sync = true;
2685        let mut timeout = None;
2686        let mut last_sync_time: Option<Instant> = None;
2687
2688        let parent_span = Span::current();
2689
2690        async_stream::stream!({
2691            loop {
2692                trace!("Syncing");
2693
2694                if sync_settings.ignore_timeout_on_first_sync {
2695                    if is_first_sync {
2696                        timeout = sync_settings.timeout.take();
2697                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
2698                        sync_settings.timeout = timeout.take();
2699                    }
2700
2701                    is_first_sync = false;
2702                }
2703
2704                yield self
2705                    .sync_loop_helper(&mut sync_settings)
2706                    .instrument(parent_span.clone())
2707                    .await;
2708
2709                Client::delay_sync(&mut last_sync_time).await
2710            }
2711        })
2712    }
2713
2714    /// Get the current, if any, sync token of the client.
2715    /// This will be None if the client didn't sync at least once.
2716    pub(crate) async fn sync_token(&self) -> Option<String> {
2717        self.inner.base_client.sync_token().await
2718    }
2719
2720    /// Gets information about the owner of a given access token.
2721    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2722        let request = whoami::v3::Request::new();
2723        self.send(request).await
2724    }
2725
2726    /// Subscribes a new receiver to client SessionChange broadcasts.
2727    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2728        let broadcast = &self.auth_ctx().session_change_sender;
2729        broadcast.subscribe()
2730    }
2731
2732    /// Sets the save/restore session callbacks.
2733    ///
2734    /// This is another mechanism to get synchronous updates to session tokens,
2735    /// while [`Self::subscribe_to_session_changes`] provides an async update.
2736    pub fn set_session_callbacks(
2737        &self,
2738        reload_session_callback: Box<ReloadSessionCallback>,
2739        save_session_callback: Box<SaveSessionCallback>,
2740    ) -> Result<()> {
2741        self.inner
2742            .auth_ctx
2743            .reload_session_callback
2744            .set(reload_session_callback)
2745            .map_err(|_| Error::MultipleSessionCallbacks)?;
2746
2747        self.inner
2748            .auth_ctx
2749            .save_session_callback
2750            .set(save_session_callback)
2751            .map_err(|_| Error::MultipleSessionCallbacks)?;
2752
2753        Ok(())
2754    }
2755
2756    /// Get the notification settings of the current owner of the client.
2757    pub async fn notification_settings(&self) -> NotificationSettings {
2758        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2759        NotificationSettings::new(self.clone(), ruleset)
2760    }
2761
2762    /// Create a new specialized `Client` that can process notifications.
2763    ///
2764    /// See [`CrossProcessLock::new`] to learn more about
2765    /// `cross_process_store_locks_holder_name`.
2766    ///
2767    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
2768    pub async fn notification_client(
2769        &self,
2770        cross_process_store_locks_holder_name: String,
2771    ) -> Result<Client> {
2772        let client = Client {
2773            inner: ClientInner::new(
2774                self.inner.auth_ctx.clone(),
2775                self.server().cloned(),
2776                self.homeserver(),
2777                self.sliding_sync_version(),
2778                self.inner.http_client.clone(),
2779                self.inner
2780                    .base_client
2781                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2782                    .await?,
2783                self.inner.caches.server_info.read().await.clone(),
2784                self.inner.respect_login_well_known,
2785                self.inner.event_cache.clone(),
2786                self.inner.send_queue_data.clone(),
2787                self.inner.latest_events.clone(),
2788                #[cfg(feature = "e2e-encryption")]
2789                self.inner.e2ee.encryption_settings,
2790                #[cfg(feature = "e2e-encryption")]
2791                self.inner.enable_share_history_on_invite,
2792                cross_process_store_locks_holder_name,
2793                #[cfg(feature = "experimental-search")]
2794                self.inner.search_index.clone(),
2795                self.inner.thread_subscription_catchup.clone(),
2796            )
2797            .await,
2798        };
2799
2800        Ok(client)
2801    }
2802
2803    /// The [`EventCache`] instance for this [`Client`].
2804    pub fn event_cache(&self) -> &EventCache {
2805        // SAFETY: always initialized in the `Client` ctor.
2806        self.inner.event_cache.get().unwrap()
2807    }
2808
2809    /// The [`LatestEvents`] instance for this [`Client`].
2810    pub async fn latest_events(&self) -> &LatestEvents {
2811        self.inner
2812            .latest_events
2813            .get_or_init(|| async {
2814                LatestEvents::new(
2815                    WeakClient::from_client(self),
2816                    self.event_cache().clone(),
2817                    SendQueue::new(self.clone()),
2818                )
2819            })
2820            .await
2821    }
2822
2823    /// Waits until an at least partially synced room is received, and returns
2824    /// it.
2825    ///
2826    /// **Note: this function will loop endlessly until either it finds the room
2827    /// or an externally set timeout happens.**
2828    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2829        loop {
2830            if let Some(room) = self.get_room(room_id) {
2831                if room.is_state_partially_or_fully_synced() {
2832                    debug!("Found just created room!");
2833                    return room;
2834                }
2835                debug!("Room wasn't partially synced, waiting for sync beat to try again");
2836            } else {
2837                debug!("Room wasn't found, waiting for sync beat to try again");
2838            }
2839            self.inner.sync_beat.listen().await;
2840        }
2841    }
2842
2843    /// Knock on a room given its `room_id_or_alias` to ask for permission to
2844    /// join it.
2845    pub async fn knock(
2846        &self,
2847        room_id_or_alias: OwnedRoomOrAliasId,
2848        reason: Option<String>,
2849        server_names: Vec<OwnedServerName>,
2850    ) -> Result<Room> {
2851        let request =
2852            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2853        let response = self.send(request).await?;
2854        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2855        Ok(Room::new(self.clone(), base_room))
2856    }
2857
2858    /// Checks whether the provided `user_id` belongs to an ignored user.
2859    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2860        self.base_client().is_user_ignored(user_id).await
2861    }
2862
2863    /// Gets the `max_upload_size` value from the homeserver, getting either a
2864    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2865    /// missing.
2866    ///
2867    /// Check the spec for more info:
2868    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2869    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2870        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2871        if let Some(data) = max_upload_size_lock.get() {
2872            return Ok(data.to_owned());
2873        }
2874
2875        // Use the authenticated endpoint when the server supports it.
2876        let supported_versions = self.supported_versions().await?;
2877        let use_auth =
2878            authenticated_media::get_media_config::v1::Request::is_supported(&supported_versions);
2879
2880        let upload_size = if use_auth {
2881            self.send(authenticated_media::get_media_config::v1::Request::default())
2882                .await?
2883                .upload_size
2884        } else {
2885            #[allow(deprecated)]
2886            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
2887        };
2888
2889        match max_upload_size_lock.set(upload_size) {
2890            Ok(_) => Ok(upload_size),
2891            Err(error) => {
2892                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2893            }
2894        }
2895    }
2896
2897    /// The settings to use for decrypting events.
2898    #[cfg(feature = "e2e-encryption")]
2899    pub fn decryption_settings(&self) -> &DecryptionSettings {
2900        &self.base_client().decryption_settings
2901    }
2902
2903    /// Returns the [`SearchIndex`] for this [`Client`].
2904    #[cfg(feature = "experimental-search")]
2905    pub fn search_index(&self) -> &SearchIndex {
2906        &self.inner.search_index
2907    }
2908
2909    /// Whether the client is configured to take thread subscriptions (MSC4306
2910    /// and MSC4308) into account.
2911    ///
2912    /// This may cause filtering out of thread subscriptions, and loading the
2913    /// thread subscriptions via the sliding sync extension, when the room
2914    /// list service is being used.
2915    pub fn enabled_thread_subscriptions(&self) -> bool {
2916        match self.base_client().threading_support {
2917            ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
2918            ThreadingSupport::Disabled => false,
2919        }
2920    }
2921
2922    /// Fetch thread subscriptions changes between `from` and up to `to`.
2923    ///
2924    /// The `limit` optional parameter can be used to limit the number of
2925    /// entries in a response. It can also be overridden by the server, if
2926    /// it's deemed too large.
2927    pub async fn fetch_thread_subscriptions(
2928        &self,
2929        from: Option<String>,
2930        to: Option<String>,
2931        limit: Option<UInt>,
2932    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
2933        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
2934            from,
2935            to,
2936            limit,
2937        });
2938        Ok(self.send(request).await?)
2939    }
2940
2941    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
2942        self.inner.thread_subscription_catchup.get().unwrap()
2943    }
2944}
2945
2946#[cfg(any(feature = "testing", test))]
2947impl Client {
2948    /// Test helper to mark users as tracked by the crypto layer.
2949    #[cfg(feature = "e2e-encryption")]
2950    pub async fn update_tracked_users_for_testing(
2951        &self,
2952        user_ids: impl IntoIterator<Item = &UserId>,
2953    ) {
2954        let olm = self.olm_machine().await;
2955        let olm = olm.as_ref().unwrap();
2956        olm.update_tracked_users(user_ids).await.unwrap();
2957    }
2958}
2959
2960/// A weak reference to the inner client, useful when trying to get a handle
2961/// on the owning client.
2962#[derive(Clone, Debug)]
2963pub(crate) struct WeakClient {
2964    client: Weak<ClientInner>,
2965}
2966
2967impl WeakClient {
2968    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2969    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2970        Self { client: Arc::downgrade(client) }
2971    }
2972
2973    /// Construct a [`WeakClient`] from a [`Client`].
2974    pub fn from_client(client: &Client) -> Self {
2975        Self::from_inner(&client.inner)
2976    }
2977
2978    /// Attempts to get a [`Client`] from this [`WeakClient`].
2979    pub fn get(&self) -> Option<Client> {
2980        self.client.upgrade().map(|inner| Client { inner })
2981    }
2982
2983    /// Gets the number of strong (`Arc`) pointers still pointing to this
2984    /// client.
2985    #[allow(dead_code)]
2986    pub fn strong_count(&self) -> usize {
2987        self.client.strong_count()
2988    }
2989}
2990
2991#[derive(Clone)]
2992struct ClientServerInfo {
2993    /// The Matrix versions and unstable features the server supports (known
2994    /// ones only).
2995    supported_versions: CachedValue<SupportedVersions>,
2996
2997    /// The server's well-known file, if any.
2998    well_known: CachedValue<Option<WellKnownResponse>>,
2999}
3000
3001/// A cached value that can either be set or not set, used to avoid confusion
3002/// between a value that is set to `None` (because it doesn't exist) and a value
3003/// that has not been cached yet.
3004#[derive(Clone)]
3005enum CachedValue<Value> {
3006    /// A value has been cached.
3007    Cached(Value),
3008    /// Nothing has been cached yet.
3009    NotSet,
3010}
3011
3012impl<Value> CachedValue<Value> {
3013    /// Unwraps the cached value, returning it if it exists.
3014    ///
3015    /// # Panics
3016    ///
3017    /// If the cached value is not set, this will panic.
3018    fn unwrap_cached_value(self) -> Value {
3019        match self {
3020            CachedValue::Cached(value) => value,
3021            CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
3022        }
3023    }
3024
3025    /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
3026    fn as_ref(&self) -> CachedValue<&Value> {
3027        match self {
3028            Self::Cached(value) => CachedValue::Cached(value),
3029            Self::NotSet => CachedValue::NotSet,
3030        }
3031    }
3032
3033    /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3034    /// function to a contained value (if `Cached`) or returns `NotSet` (if
3035    /// `NotSet`).
3036    fn map<Other, F>(self, f: F) -> CachedValue<Other>
3037    where
3038        F: FnOnce(Value) -> Other,
3039    {
3040        match self {
3041            Self::Cached(value) => CachedValue::Cached(f(value)),
3042            Self::NotSet => CachedValue::NotSet,
3043        }
3044    }
3045}
3046
3047/// Information about the state of a room before we joined it.
3048#[derive(Debug, Clone, Default)]
3049struct PreJoinRoomInfo {
3050    /// The user who invited us to the room, if any.
3051    pub inviter: Option<RoomMember>,
3052}
3053
3054// The http mocking library is not supported for wasm32
3055#[cfg(all(test, not(target_family = "wasm")))]
3056pub(crate) mod tests {
3057    use std::{sync::Arc, time::Duration};
3058
3059    use assert_matches::assert_matches;
3060    use assert_matches2::assert_let;
3061    use eyeball::SharedObservable;
3062    use futures_util::{FutureExt, StreamExt, pin_mut};
3063    use js_int::{UInt, uint};
3064    use matrix_sdk_base::{
3065        RoomState,
3066        store::{MemoryStore, StoreConfig},
3067    };
3068    use matrix_sdk_test::{
3069        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3070        event_factory::EventFactory,
3071    };
3072    #[cfg(target_family = "wasm")]
3073    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3074
3075    use ruma::{
3076        RoomId, ServerName, UserId,
3077        api::{
3078            FeatureFlag, MatrixVersion,
3079            client::{
3080                discovery::discover_homeserver::RtcFocusInfo,
3081                room::create_room::v3::Request as CreateRoomRequest,
3082            },
3083        },
3084        assign,
3085        events::{
3086            ignored_user_list::IgnoredUserListEventContent,
3087            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3088        },
3089        owned_room_id, owned_user_id, room_alias_id, room_id,
3090    };
3091    use serde_json::json;
3092    use stream_assert::{assert_next_matches, assert_pending};
3093    use tokio::{
3094        spawn,
3095        time::{sleep, timeout},
3096    };
3097    use url::Url;
3098
3099    use super::Client;
3100    use crate::{
3101        Error, TransmissionProgress,
3102        client::{WeakClient, futures::SendMediaUploadRequest},
3103        config::{RequestConfig, SyncSettings},
3104        futures::SendRequest,
3105        media::MediaError,
3106        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3107    };
3108
3109    #[async_test]
3110    async fn test_account_data() {
3111        let server = MatrixMockServer::new().await;
3112        let client = server.client_builder().build().await;
3113
3114        let f = EventFactory::new();
3115        server
3116            .mock_sync()
3117            .ok_and_run(&client, |builder| {
3118                builder.add_global_account_data(
3119                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3120                );
3121            })
3122            .await;
3123
3124        let content = client
3125            .account()
3126            .account_data::<IgnoredUserListEventContent>()
3127            .await
3128            .unwrap()
3129            .unwrap()
3130            .deserialize()
3131            .unwrap();
3132
3133        assert_eq!(content.ignored_users.len(), 1);
3134    }
3135
3136    #[async_test]
3137    async fn test_successful_discovery() {
3138        // Imagine this is `matrix.org`.
3139        let server = MatrixMockServer::new().await;
3140        let server_url = server.uri();
3141
3142        // Imagine this is `matrix-client.matrix.org`.
3143        let homeserver = MatrixMockServer::new().await;
3144        let homeserver_url = homeserver.uri();
3145
3146        // Imagine Alice has the user ID `@alice:matrix.org`.
3147        let domain = server_url.strip_prefix("http://").unwrap();
3148        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3149
3150        // The `.well-known` is on the server (e.g. `matrix.org`).
3151        server
3152            .mock_well_known()
3153            .ok_with_homeserver_url(&homeserver_url)
3154            .mock_once()
3155            .named("well-known")
3156            .mount()
3157            .await;
3158
3159        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3160        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3161
3162        let client = Client::builder()
3163            .insecure_server_name_no_tls(alice.server_name())
3164            .build()
3165            .await
3166            .unwrap();
3167
3168        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3169        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3170        client.server_versions().await.unwrap();
3171    }
3172
3173    #[async_test]
3174    async fn test_discovery_broken_server() {
3175        let server = MatrixMockServer::new().await;
3176        let server_url = server.uri();
3177        let domain = server_url.strip_prefix("http://").unwrap();
3178        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3179
3180        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3181
3182        assert!(
3183            Client::builder()
3184                .insecure_server_name_no_tls(alice.server_name())
3185                .build()
3186                .await
3187                .is_err(),
3188            "Creating a client from a user ID should fail when the .well-known request fails."
3189        );
3190    }
3191
3192    #[async_test]
3193    async fn test_room_creation() {
3194        let server = MatrixMockServer::new().await;
3195        let client = server.client_builder().build().await;
3196
3197        server
3198            .mock_sync()
3199            .ok_and_run(&client, |builder| {
3200                builder.add_joined_room(
3201                    JoinedRoomBuilder::default()
3202                        .add_state_event(StateTestEvent::Member)
3203                        .add_state_event(StateTestEvent::PowerLevels),
3204                );
3205            })
3206            .await;
3207
3208        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3209        assert_eq!(room.state(), RoomState::Joined);
3210    }
3211
3212    #[async_test]
3213    async fn test_retry_limit_http_requests() {
3214        let server = MatrixMockServer::new().await;
3215        let client = server
3216            .client_builder()
3217            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3218            .build()
3219            .await;
3220
3221        assert!(client.request_config().retry_limit.unwrap() == 3);
3222
3223        server.mock_login().error500().expect(3).mount().await;
3224
3225        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3226    }
3227
3228    #[async_test]
3229    async fn test_retry_timeout_http_requests() {
3230        // Keep this timeout small so that the test doesn't take long
3231        let retry_timeout = Duration::from_secs(5);
3232        let server = MatrixMockServer::new().await;
3233        let client = server
3234            .client_builder()
3235            .on_builder(|builder| {
3236                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3237            })
3238            .build()
3239            .await;
3240
3241        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3242
3243        server.mock_login().error500().expect(2..).mount().await;
3244
3245        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3246    }
3247
3248    #[async_test]
3249    async fn test_short_retry_initial_http_requests() {
3250        let server = MatrixMockServer::new().await;
3251        let client = server
3252            .client_builder()
3253            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3254            .build()
3255            .await;
3256
3257        server.mock_login().error500().expect(3..).mount().await;
3258
3259        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3260    }
3261
3262    #[async_test]
3263    async fn test_no_retry_http_requests() {
3264        let server = MatrixMockServer::new().await;
3265        let client = server.client_builder().build().await;
3266
3267        server.mock_devices().error500().mock_once().mount().await;
3268
3269        client.devices().await.unwrap_err();
3270    }
3271
3272    #[async_test]
3273    async fn test_set_homeserver() {
3274        let client = MockClientBuilder::new(None).build().await;
3275        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3276
3277        let homeserver = Url::parse("http://example.com/").unwrap();
3278        client.set_homeserver(homeserver.clone());
3279        assert_eq!(client.homeserver(), homeserver);
3280    }
3281
3282    #[async_test]
3283    async fn test_search_user_request() {
3284        let server = MatrixMockServer::new().await;
3285        let client = server.client_builder().build().await;
3286
3287        server.mock_user_directory().ok().mock_once().mount().await;
3288
3289        let response = client.search_users("test", 50).await.unwrap();
3290        assert_eq!(response.results.len(), 1);
3291        let result = &response.results[0];
3292        assert_eq!(result.user_id.to_string(), "@test:example.me");
3293        assert_eq!(result.display_name.clone().unwrap(), "Test");
3294        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3295        assert!(!response.limited);
3296    }
3297
3298    #[async_test]
3299    async fn test_request_unstable_features() {
3300        let server = MatrixMockServer::new().await;
3301        let client = server.client_builder().no_server_versions().build().await;
3302
3303        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3304
3305        let unstable_features = client.unstable_features().await.unwrap();
3306        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3307        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3308    }
3309
3310    #[async_test]
3311    async fn test_can_homeserver_push_encrypted_event_to_device() {
3312        let server = MatrixMockServer::new().await;
3313        let client = server.client_builder().no_server_versions().build().await;
3314
3315        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3316
3317        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3318        assert!(msc4028_enabled);
3319    }
3320
3321    #[async_test]
3322    async fn test_recently_visited_rooms() {
3323        // Tracking recently visited rooms requires authentication
3324        let client = MockClientBuilder::new(None).unlogged().build().await;
3325        assert_matches!(
3326            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3327            Err(Error::AuthenticationRequired)
3328        );
3329
3330        let client = MockClientBuilder::new(None).build().await;
3331        let account = client.account();
3332
3333        // We should start off with an empty list
3334        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3335
3336        // Tracking a valid room id should add it to the list
3337        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3338        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3339        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3340
3341        // And the existing list shouldn't be changed
3342        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3343        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3344
3345        // Tracking the same room again shouldn't change the list
3346        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3347        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3348        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3349
3350        // Tracking a second room should add it to the front of the list
3351        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3352        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3353        assert_eq!(
3354            account.get_recently_visited_rooms().await.unwrap(),
3355            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3356        );
3357
3358        // Tracking the first room yet again should move it to the front of the list
3359        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3360        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3361        assert_eq!(
3362            account.get_recently_visited_rooms().await.unwrap(),
3363            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3364        );
3365
3366        // Tracking should be capped at 20
3367        for n in 0..20 {
3368            account
3369                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3370                .await
3371                .unwrap();
3372        }
3373
3374        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3375
3376        // And the initial rooms should've been pushed out
3377        let rooms = account.get_recently_visited_rooms().await.unwrap();
3378        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3379        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3380
3381        // And the last tracked room should be the first
3382        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3383    }
3384
3385    #[async_test]
3386    async fn test_client_no_cycle_with_event_cache() {
3387        let client = MockClientBuilder::new(None).build().await;
3388
3389        // Wait for the init tasks to die.
3390        sleep(Duration::from_secs(1)).await;
3391
3392        let weak_client = WeakClient::from_client(&client);
3393        assert_eq!(weak_client.strong_count(), 1);
3394
3395        {
3396            let room_id = room_id!("!room:example.org");
3397
3398            // Have the client know the room.
3399            let response = SyncResponseBuilder::default()
3400                .add_joined_room(JoinedRoomBuilder::new(room_id))
3401                .build_sync_response();
3402            client.inner.base_client.receive_sync_response(response).await.unwrap();
3403
3404            client.event_cache().subscribe().unwrap();
3405
3406            let (_room_event_cache, _drop_handles) =
3407                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3408        }
3409
3410        drop(client);
3411
3412        // Give a bit of time for background tasks to die.
3413        sleep(Duration::from_secs(1)).await;
3414
3415        // The weak client must be the last reference to the client now.
3416        assert_eq!(weak_client.strong_count(), 0);
3417        let client = weak_client.get();
3418        assert!(
3419            client.is_none(),
3420            "too many strong references to the client: {}",
3421            Arc::strong_count(&client.unwrap().inner)
3422        );
3423    }
3424
3425    #[async_test]
3426    async fn test_server_info_caching() {
3427        let server = MatrixMockServer::new().await;
3428        let server_url = server.uri();
3429        let domain = server_url.strip_prefix("http://").unwrap();
3430        let server_name = <&ServerName>::try_from(domain).unwrap();
3431        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3432
3433        let well_known_mock = server
3434            .mock_well_known()
3435            .ok()
3436            .named("well known mock")
3437            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3438            .mount_as_scoped()
3439            .await;
3440
3441        let versions_mock = server
3442            .mock_versions()
3443            .ok_with_unstable_features()
3444            .named("first versions mock")
3445            .expect(1)
3446            .mount_as_scoped()
3447            .await;
3448
3449        let memory_store = Arc::new(MemoryStore::new());
3450        let client = Client::builder()
3451            .insecure_server_name_no_tls(server_name)
3452            .store_config(
3453                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3454                    .state_store(memory_store.clone()),
3455            )
3456            .build()
3457            .await
3458            .unwrap();
3459
3460        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3461
3462        // These subsequent calls hit the in-memory cache.
3463        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3464        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3465
3466        drop(client);
3467
3468        let client = server
3469            .client_builder()
3470            .no_server_versions()
3471            .on_builder(|builder| {
3472                builder.store_config(
3473                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3474                        .state_store(memory_store.clone()),
3475                )
3476            })
3477            .build()
3478            .await;
3479
3480        // These calls to the new client hit the on-disk cache.
3481        assert!(
3482            client
3483                .unstable_features()
3484                .await
3485                .unwrap()
3486                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3487        );
3488        let supported = client.supported_versions().await.unwrap();
3489        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3490        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3491
3492        // Then this call hits the in-memory cache.
3493        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3494
3495        drop(versions_mock);
3496        drop(well_known_mock);
3497
3498        // Now, reset the cache, and observe the endpoints being called again once.
3499        client.reset_server_info().await.unwrap();
3500
3501        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3502
3503        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3504
3505        // Hits network again.
3506        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3507        // Hits in-memory cache again.
3508        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3509        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3510    }
3511
3512    #[async_test]
3513    async fn test_server_info_without_a_well_known() {
3514        let server = MatrixMockServer::new().await;
3515        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3516
3517        let versions_mock = server
3518            .mock_versions()
3519            .ok_with_unstable_features()
3520            .named("first versions mock")
3521            .expect(1)
3522            .mount_as_scoped()
3523            .await;
3524
3525        let memory_store = Arc::new(MemoryStore::new());
3526        let client = server
3527            .client_builder()
3528            .no_server_versions()
3529            .on_builder(|builder| {
3530                builder.store_config(
3531                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3532                        .state_store(memory_store.clone()),
3533                )
3534            })
3535            .build()
3536            .await;
3537
3538        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3539
3540        // These subsequent calls hit the in-memory cache.
3541        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3542        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3543
3544        drop(client);
3545
3546        let client = server
3547            .client_builder()
3548            .no_server_versions()
3549            .on_builder(|builder| {
3550                builder.store_config(
3551                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3552                        .state_store(memory_store.clone()),
3553                )
3554            })
3555            .build()
3556            .await;
3557
3558        // This call to the new client hits the on-disk cache.
3559        assert!(
3560            client
3561                .unstable_features()
3562                .await
3563                .unwrap()
3564                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3565        );
3566
3567        // Then this call hits the in-memory cache.
3568        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3569
3570        drop(versions_mock);
3571
3572        // Now, reset the cache, and observe the endpoints being called again once.
3573        client.reset_server_info().await.unwrap();
3574
3575        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3576
3577        // Hits network again.
3578        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3579        // Hits in-memory cache again.
3580        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3581        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3582    }
3583
3584    #[async_test]
3585    async fn test_no_network_doesnt_cause_infinite_retries() {
3586        // We want infinite retries for transient errors.
3587        let client = MockClientBuilder::new(None)
3588            .on_builder(|builder| builder.request_config(RequestConfig::new()))
3589            .build()
3590            .await;
3591
3592        // We don't define a mock server on purpose here, so that the error is really a
3593        // network error.
3594        client.whoami().await.unwrap_err();
3595    }
3596
3597    #[async_test]
3598    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3599        let server = MatrixMockServer::new().await;
3600        let client = server.client_builder().build().await;
3601
3602        let room_id = room_id!("!room:example.org");
3603
3604        server
3605            .mock_sync()
3606            .ok_and_run(&client, |builder| {
3607                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3608            })
3609            .await;
3610
3611        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3612        assert_eq!(room.room_id(), room_id);
3613    }
3614
3615    #[async_test]
3616    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3617        let server = MatrixMockServer::new().await;
3618        let client = server.client_builder().build().await;
3619
3620        let room_id = room_id!("!room:example.org");
3621
3622        let client = Arc::new(client);
3623
3624        // Perform the /sync request with a delay so it starts after the
3625        // `await_room_remote_echo` call has happened
3626        spawn({
3627            let client = client.clone();
3628            async move {
3629                sleep(Duration::from_millis(100)).await;
3630
3631                server
3632                    .mock_sync()
3633                    .ok_and_run(&client, |builder| {
3634                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3635                    })
3636                    .await;
3637            }
3638        });
3639
3640        let room =
3641            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3642        assert_eq!(room.room_id(), room_id);
3643    }
3644
3645    #[async_test]
3646    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3647        let client = MockClientBuilder::new(None).build().await;
3648
3649        let room_id = room_id!("!room:example.org");
3650        // Room is not present so the client won't be able to find it. The call will
3651        // timeout.
3652        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3653    }
3654
3655    #[async_test]
3656    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3657        let server = MatrixMockServer::new().await;
3658        let client = server.client_builder().build().await;
3659
3660        server.mock_create_room().ok().mount().await;
3661
3662        // Create a room in the internal store
3663        let room = client
3664            .create_room(assign!(CreateRoomRequest::new(), {
3665                invite: vec![],
3666                is_direct: false,
3667            }))
3668            .await
3669            .unwrap();
3670
3671        // Room is locally present, but not synced, the call will timeout
3672        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3673            .await
3674            .unwrap_err();
3675    }
3676
3677    #[async_test]
3678    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3679        let server = MatrixMockServer::new().await;
3680        let client = server.client_builder().build().await;
3681
3682        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3683
3684        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3685        assert_matches!(ret, Ok(true));
3686    }
3687
3688    #[async_test]
3689    async fn test_is_room_alias_available_if_alias_is_resolved() {
3690        let server = MatrixMockServer::new().await;
3691        let client = server.client_builder().build().await;
3692
3693        server
3694            .mock_room_directory_resolve_alias()
3695            .ok("!some_room_id:matrix.org", Vec::new())
3696            .expect(1)
3697            .mount()
3698            .await;
3699
3700        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3701        assert_matches!(ret, Ok(false));
3702    }
3703
3704    #[async_test]
3705    async fn test_is_room_alias_available_if_error_found() {
3706        let server = MatrixMockServer::new().await;
3707        let client = server.client_builder().build().await;
3708
3709        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3710
3711        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3712        assert_matches!(ret, Err(_));
3713    }
3714
3715    #[async_test]
3716    async fn test_create_room_alias() {
3717        let server = MatrixMockServer::new().await;
3718        let client = server.client_builder().build().await;
3719
3720        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3721
3722        let ret = client
3723            .create_room_alias(
3724                room_alias_id!("#some_alias:matrix.org"),
3725                room_id!("!some_room:matrix.org"),
3726            )
3727            .await;
3728        assert_matches!(ret, Ok(()));
3729    }
3730
3731    #[async_test]
3732    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3733        let server = MatrixMockServer::new().await;
3734        let client = server.client_builder().build().await;
3735
3736        let room_id = room_id!("!a-room:matrix.org");
3737
3738        // Make sure the summary endpoint is called once
3739        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3740
3741        // We create a locally cached invited room
3742        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3743
3744        // And we get a preview, the server endpoint was reached
3745        let preview = client
3746            .get_room_preview(room_id.into(), Vec::new())
3747            .await
3748            .expect("Room preview should be retrieved");
3749
3750        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3751    }
3752
3753    #[async_test]
3754    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3755        let server = MatrixMockServer::new().await;
3756        let client = server.client_builder().build().await;
3757
3758        let room_id = room_id!("!a-room:matrix.org");
3759
3760        // Make sure the summary endpoint is called once
3761        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3762
3763        // We create a locally cached left room
3764        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3765
3766        // And we get a preview, the server endpoint was reached
3767        let preview = client
3768            .get_room_preview(room_id.into(), Vec::new())
3769            .await
3770            .expect("Room preview should be retrieved");
3771
3772        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3773    }
3774
3775    #[async_test]
3776    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3777        let server = MatrixMockServer::new().await;
3778        let client = server.client_builder().build().await;
3779
3780        let room_id = room_id!("!a-room:matrix.org");
3781
3782        // Make sure the summary endpoint is called once
3783        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3784
3785        // We create a locally cached knocked room
3786        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3787
3788        // And we get a preview, the server endpoint was reached
3789        let preview = client
3790            .get_room_preview(room_id.into(), Vec::new())
3791            .await
3792            .expect("Room preview should be retrieved");
3793
3794        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3795    }
3796
3797    #[async_test]
3798    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3799        let server = MatrixMockServer::new().await;
3800        let client = server.client_builder().build().await;
3801
3802        let room_id = room_id!("!a-room:matrix.org");
3803
3804        // Make sure the summary endpoint is not called
3805        server.mock_room_summary().ok(room_id).never().mount().await;
3806
3807        // We create a locally cached joined room
3808        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3809
3810        // And we get a preview, no server endpoint was reached
3811        let preview = client
3812            .get_room_preview(room_id.into(), Vec::new())
3813            .await
3814            .expect("Room preview should be retrieved");
3815
3816        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3817    }
3818
3819    #[async_test]
3820    async fn test_media_preview_config() {
3821        let server = MatrixMockServer::new().await;
3822        let client = server.client_builder().build().await;
3823
3824        server
3825            .mock_sync()
3826            .ok_and_run(&client, |builder| {
3827                builder.add_custom_global_account_data(json!({
3828                    "content": {
3829                        "media_previews": "private",
3830                        "invite_avatars": "off"
3831                    },
3832                    "type": "m.media_preview_config"
3833                }));
3834            })
3835            .await;
3836
3837        let (initial_value, stream) =
3838            client.account().observe_media_preview_config().await.unwrap();
3839
3840        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3841        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3842        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3843        pin_mut!(stream);
3844        assert_pending!(stream);
3845
3846        server
3847            .mock_sync()
3848            .ok_and_run(&client, |builder| {
3849                builder.add_custom_global_account_data(json!({
3850                    "content": {
3851                        "media_previews": "off",
3852                        "invite_avatars": "on"
3853                    },
3854                    "type": "m.media_preview_config"
3855                }));
3856            })
3857            .await;
3858
3859        assert_next_matches!(
3860            stream,
3861            MediaPreviewConfigEventContent {
3862                media_previews: Some(MediaPreviews::Off),
3863                invite_avatars: Some(InviteAvatars::On),
3864                ..
3865            }
3866        );
3867        assert_pending!(stream);
3868    }
3869
3870    #[async_test]
3871    async fn test_unstable_media_preview_config() {
3872        let server = MatrixMockServer::new().await;
3873        let client = server.client_builder().build().await;
3874
3875        server
3876            .mock_sync()
3877            .ok_and_run(&client, |builder| {
3878                builder.add_custom_global_account_data(json!({
3879                    "content": {
3880                        "media_previews": "private",
3881                        "invite_avatars": "off"
3882                    },
3883                    "type": "io.element.msc4278.media_preview_config"
3884                }));
3885            })
3886            .await;
3887
3888        let (initial_value, stream) =
3889            client.account().observe_media_preview_config().await.unwrap();
3890
3891        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3892        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3893        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3894        pin_mut!(stream);
3895        assert_pending!(stream);
3896
3897        server
3898            .mock_sync()
3899            .ok_and_run(&client, |builder| {
3900                builder.add_custom_global_account_data(json!({
3901                    "content": {
3902                        "media_previews": "off",
3903                        "invite_avatars": "on"
3904                    },
3905                    "type": "io.element.msc4278.media_preview_config"
3906                }));
3907            })
3908            .await;
3909
3910        assert_next_matches!(
3911            stream,
3912            MediaPreviewConfigEventContent {
3913                media_previews: Some(MediaPreviews::Off),
3914                invite_avatars: Some(InviteAvatars::On),
3915                ..
3916            }
3917        );
3918        assert_pending!(stream);
3919    }
3920
3921    #[async_test]
3922    async fn test_media_preview_config_not_found() {
3923        let server = MatrixMockServer::new().await;
3924        let client = server.client_builder().build().await;
3925
3926        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3927
3928        assert!(initial_value.is_none());
3929    }
3930
3931    #[async_test]
3932    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
3933        // The default Matrix version we use is 1.11 or higher, so authenticated media
3934        // is supported.
3935        let server = MatrixMockServer::new().await;
3936        let client = server.client_builder().build().await;
3937
3938        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3939
3940        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3941        client.load_or_fetch_max_upload_size().await.unwrap();
3942
3943        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3944    }
3945
3946    #[async_test]
3947    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
3948        // The server must advertise support for the stable feature for authenticated
3949        // media support, so we mock the `GET /versions` response.
3950        let server = MatrixMockServer::new().await;
3951        let client = server.client_builder().no_server_versions().build().await;
3952
3953        server
3954            .mock_versions()
3955            .ok_custom(
3956                &["v1.7", "v1.8", "v1.9", "v1.10"],
3957                &[("org.matrix.msc3916.stable", true)].into(),
3958            )
3959            .named("versions")
3960            .expect(1)
3961            .mount()
3962            .await;
3963
3964        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3965
3966        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3967        client.load_or_fetch_max_upload_size().await.unwrap();
3968
3969        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3970    }
3971
3972    #[async_test]
3973    async fn test_load_or_fetch_max_upload_size_no_auth() {
3974        // The server must not support Matrix 1.11 or higher for unauthenticated
3975        // media requests, so we mock the `GET /versions` response.
3976        let server = MatrixMockServer::new().await;
3977        let client = server.client_builder().no_server_versions().build().await;
3978
3979        server
3980            .mock_versions()
3981            .ok_custom(&["v1.1"], &Default::default())
3982            .named("versions")
3983            .expect(1)
3984            .mount()
3985            .await;
3986
3987        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3988
3989        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
3990        client.load_or_fetch_max_upload_size().await.unwrap();
3991
3992        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3993    }
3994
3995    #[async_test]
3996    async fn test_uploading_a_too_large_media_file() {
3997        let server = MatrixMockServer::new().await;
3998        let client = server.client_builder().build().await;
3999
4000        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4001        client.load_or_fetch_max_upload_size().await.unwrap();
4002        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4003
4004        let data = vec![1, 2];
4005        let upload_request =
4006            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4007        let request = SendRequest {
4008            client: client.clone(),
4009            request: upload_request,
4010            config: None,
4011            send_progress: SharedObservable::new(TransmissionProgress::default()),
4012        };
4013        let media_request = SendMediaUploadRequest::new(request);
4014
4015        let error = media_request.await.err();
4016        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4017        assert_eq!(max, uint!(1));
4018        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4019    }
4020
4021    #[async_test]
4022    async fn test_dont_ignore_timeout_on_first_sync() {
4023        let server = MatrixMockServer::new().await;
4024        let client = server.client_builder().build().await;
4025
4026        server
4027            .mock_sync()
4028            .timeout(Some(Duration::from_secs(30)))
4029            .ok(|_| {})
4030            .mock_once()
4031            .named("sync_with_timeout")
4032            .mount()
4033            .await;
4034
4035        // Call the endpoint once to check the timeout.
4036        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4037
4038        timeout(Duration::from_secs(1), async {
4039            stream.next().await.unwrap().unwrap();
4040        })
4041        .await
4042        .unwrap();
4043    }
4044
4045    #[async_test]
4046    async fn test_ignore_timeout_on_first_sync() {
4047        let server = MatrixMockServer::new().await;
4048        let client = server.client_builder().build().await;
4049
4050        server
4051            .mock_sync()
4052            .timeout(None)
4053            .ok(|_| {})
4054            .mock_once()
4055            .named("sync_no_timeout")
4056            .mount()
4057            .await;
4058        server
4059            .mock_sync()
4060            .timeout(Some(Duration::from_secs(30)))
4061            .ok(|_| {})
4062            .mock_once()
4063            .named("sync_with_timeout")
4064            .mount()
4065            .await;
4066
4067        // Call each version of the endpoint once to check the timeouts.
4068        let mut stream = Box::pin(
4069            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4070        );
4071
4072        timeout(Duration::from_secs(1), async {
4073            stream.next().await.unwrap().unwrap();
4074            stream.next().await.unwrap().unwrap();
4075        })
4076        .await
4077        .unwrap();
4078    }
4079}