matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
32use matrix_sdk_base::{
33 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
34 StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
35 event_cache::store::EventCacheStoreLock,
36 media::store::MediaStoreLock,
37 store::{
38 DynStateStore, RoomLoadSettings, SupportedVersionsResponse, TtlStoreValue,
39 WellKnownResponse,
40 },
41 sync::{Notification, RoomUpdates},
42};
43use matrix_sdk_common::ttl_cache::TtlCache;
44#[cfg(feature = "e2e-encryption")]
45use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
46use ruma::{
47 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
48 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
49 api::{
50 FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
51 auth_scheme::{AuthScheme, SendAccessToken},
52 client::{
53 account::whoami,
54 alias::{create_alias, delete_alias, get_alias},
55 authenticated_media,
56 device::{self, delete_devices, get_devices, update_device},
57 directory::{get_public_rooms, get_public_rooms_filtered},
58 discovery::{
59 discover_homeserver::{self, RtcFocusInfo},
60 get_capabilities::{self, v3::Capabilities},
61 get_supported_versions,
62 },
63 error::ErrorKind,
64 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
65 knock::knock_room,
66 media,
67 membership::{join_room_by_id, join_room_by_id_or_alias},
68 room::create_room,
69 session::login::v3::DiscoveryInfo,
70 sync::sync_events,
71 threads::get_thread_subscriptions_changes,
72 uiaa,
73 user_directory::search_users,
74 },
75 error::FromHttpResponseError,
76 path_builder::PathBuilder,
77 },
78 assign,
79 events::direct::DirectUserIdentifier,
80 push::Ruleset,
81 time::Instant,
82};
83use serde::de::DeserializeOwned;
84use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
85use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
86use url::Url;
87
88use self::{
89 caches::{CachedValue, ClientCaches},
90 futures::SendRequest,
91};
92use crate::{
93 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
94 Room, SessionTokens, TransmissionProgress,
95 authentication::{
96 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
97 oauth::OAuth,
98 },
99 client::thread_subscriptions::ThreadSubscriptionCatchup,
100 config::{RequestConfig, SyncToken},
101 deduplicating_handler::DeduplicatingHandler,
102 error::HttpResult,
103 event_cache::EventCache,
104 event_handler::{
105 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
106 EventHandlerStore, ObservableEventHandler, SyncEvent,
107 },
108 http_client::{HttpClient, SupportedPathBuilder},
109 latest_events::LatestEvents,
110 media::MediaError,
111 notification_settings::NotificationSettings,
112 room::RoomMember,
113 room_preview::RoomPreview,
114 send_queue::{SendQueue, SendQueueData},
115 sliding_sync::Version as SlidingSyncVersion,
116 sync::{RoomUpdate, SyncResponse},
117};
118#[cfg(feature = "e2e-encryption")]
119use crate::{
120 cross_process_lock::CrossProcessLock,
121 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
122};
123
124mod builder;
125pub(crate) mod caches;
126pub(crate) mod futures;
127pub(crate) mod thread_subscriptions;
128
129pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
130#[cfg(feature = "experimental-search")]
131use crate::search_index::SearchIndex;
132
133#[cfg(not(target_family = "wasm"))]
134type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
135#[cfg(target_family = "wasm")]
136type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
137
138#[cfg(not(target_family = "wasm"))]
139type NotificationHandlerFn =
140 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
141#[cfg(target_family = "wasm")]
142type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
143
144/// Enum controlling if a loop running callbacks should continue or abort.
145///
146/// This is mainly used in the [`sync_with_callback`] method, the return value
147/// of the provided callback controls if the sync loop should be exited.
148///
149/// [`sync_with_callback`]: #method.sync_with_callback
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub enum LoopCtrl {
152 /// Continue running the loop.
153 Continue,
154 /// Break out of the loop.
155 Break,
156}
157
158/// Represents changes that can occur to a `Client`s `Session`.
159#[derive(Debug, Clone, PartialEq)]
160pub enum SessionChange {
161 /// The session's token is no longer valid.
162 UnknownToken {
163 /// Whether or not the session was soft logged out
164 soft_logout: bool,
165 },
166 /// The session's tokens have been refreshed.
167 TokensRefreshed,
168}
169
170/// Information about the server vendor obtained from the federation API.
171#[derive(Debug, Clone, PartialEq, Eq)]
172#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
173pub struct ServerVendorInfo {
174 /// The server name.
175 pub server_name: String,
176 /// The server version.
177 pub version: String,
178}
179
180/// An async/await enabled Matrix client.
181///
182/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
183#[derive(Clone)]
184pub struct Client {
185 pub(crate) inner: Arc<ClientInner>,
186}
187
188#[derive(Default)]
189pub(crate) struct ClientLocks {
190 /// Lock ensuring that only a single room may be marked as a DM at once.
191 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
192 /// explanation.
193 pub(crate) mark_as_dm_lock: Mutex<()>,
194
195 /// Lock ensuring that only a single secret store is getting opened at the
196 /// same time.
197 ///
198 /// This is important so we don't accidentally create multiple different new
199 /// default secret storage keys.
200 #[cfg(feature = "e2e-encryption")]
201 pub(crate) open_secret_store_lock: Mutex<()>,
202
203 /// Lock ensuring that we're only storing a single secret at a time.
204 ///
205 /// Take a look at the [`SecretStore::put_secret`] method for a more
206 /// detailed explanation.
207 ///
208 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
209 #[cfg(feature = "e2e-encryption")]
210 pub(crate) store_secret_lock: Mutex<()>,
211
212 /// Lock ensuring that only one method at a time might modify our backup.
213 #[cfg(feature = "e2e-encryption")]
214 pub(crate) backup_modify_lock: Mutex<()>,
215
216 /// Lock ensuring that we're going to attempt to upload backups for a single
217 /// requester.
218 #[cfg(feature = "e2e-encryption")]
219 pub(crate) backup_upload_lock: Mutex<()>,
220
221 /// Handler making sure we only have one group session sharing request in
222 /// flight per room.
223 #[cfg(feature = "e2e-encryption")]
224 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
225
226 /// Lock making sure we're only doing one key claim request at a time.
227 #[cfg(feature = "e2e-encryption")]
228 pub(crate) key_claim_lock: Mutex<()>,
229
230 /// Handler to ensure that only one members request is running at a time,
231 /// given a room.
232 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
233
234 /// Handler to ensure that only one encryption state request is running at a
235 /// time, given a room.
236 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
237
238 /// Deduplicating handler for sending read receipts. The string is an
239 /// internal implementation detail, see [`Self::send_single_receipt`].
240 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
241
242 #[cfg(feature = "e2e-encryption")]
243 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
244
245 /// Latest "generation" of data known by the crypto store.
246 ///
247 /// This is a counter that only increments, set in the database (and can
248 /// wrap). It's incremented whenever some process acquires a lock for the
249 /// first time. *This assumes the crypto store lock is being held, to
250 /// avoid data races on writing to this value in the store*.
251 ///
252 /// The current process will maintain this value in local memory and in the
253 /// DB over time. Observing a different value than the one read in
254 /// memory, when reading from the store indicates that somebody else has
255 /// written into the database under our feet.
256 ///
257 /// TODO: this should live in the `OlmMachine`, since it's information
258 /// related to the lock. As of today (2023-07-28), we blow up the entire
259 /// olm machine when there's a generation mismatch. So storing the
260 /// generation in the olm machine would make the client think there's
261 /// *always* a mismatch, and that's why we need to store the generation
262 /// outside the `OlmMachine`.
263 #[cfg(feature = "e2e-encryption")]
264 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
265}
266
267pub(crate) struct ClientInner {
268 /// All the data related to authentication and authorization.
269 pub(crate) auth_ctx: Arc<AuthCtx>,
270
271 /// The URL of the server.
272 ///
273 /// Not to be confused with the `Self::homeserver`. `server` is usually
274 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
275 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
276 /// homeserver (at the time of writing — 2024-08-28).
277 ///
278 /// This value is optional depending on how the `Client` has been built.
279 /// If it's been built from a homeserver URL directly, we don't know the
280 /// server. However, if the `Client` has been built from a server URL or
281 /// name, then the homeserver has been discovered, and we know both.
282 server: Option<Url>,
283
284 /// The URL of the homeserver to connect to.
285 ///
286 /// This is the URL for the client-server Matrix API.
287 homeserver: StdRwLock<Url>,
288
289 /// The sliding sync version.
290 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
291
292 /// The underlying HTTP client.
293 pub(crate) http_client: HttpClient,
294
295 /// User session data.
296 pub(super) base_client: BaseClient,
297
298 /// Collection of in-memory caches for the [`Client`].
299 pub(crate) caches: ClientCaches,
300
301 /// Collection of locks individual client methods might want to use, either
302 /// to ensure that only a single call to a method happens at once or to
303 /// deduplicate multiple calls to a method.
304 pub(crate) locks: ClientLocks,
305
306 /// The cross-process store locks holder name.
307 ///
308 /// The SDK provides cross-process store locks (see
309 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
310 /// `holder_name` is the value used for all cross-process store locks
311 /// used by this `Client`.
312 ///
313 /// If multiple `Client`s are running in different processes, this
314 /// value MUST be different for each `Client`.
315 cross_process_store_locks_holder_name: String,
316
317 /// A mapping of the times at which the current user sent typing notices,
318 /// keyed by room.
319 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
320
321 /// Event handlers. See `add_event_handler`.
322 pub(crate) event_handlers: EventHandlerStore,
323
324 /// Notification handlers. See `register_notification_handler`.
325 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
326
327 /// The sender-side of channels used to receive room updates.
328 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
329
330 /// The sender-side of a channel used to observe all the room updates of a
331 /// sync response.
332 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
333
334 /// Whether the client should update its homeserver URL with the discovery
335 /// information present in the login response.
336 respect_login_well_known: bool,
337
338 /// An event that can be listened on to wait for a successful sync. The
339 /// event will only be fired if a sync loop is running. Can be used for
340 /// synchronization, e.g. if we send out a request to create a room, we can
341 /// wait for the sync to get the data to fetch a room object from the state
342 /// store.
343 pub(crate) sync_beat: event_listener::Event,
344
345 /// A central cache for events, inactive first.
346 ///
347 /// It becomes active when [`EventCache::subscribe`] is called.
348 pub(crate) event_cache: OnceCell<EventCache>,
349
350 /// End-to-end encryption related state.
351 #[cfg(feature = "e2e-encryption")]
352 pub(crate) e2ee: EncryptionData,
353
354 /// The verification state of our own device.
355 #[cfg(feature = "e2e-encryption")]
356 pub(crate) verification_state: SharedObservable<VerificationState>,
357
358 /// Whether to enable the experimental support for sending and receiving
359 /// encrypted room history on invite, per [MSC4268].
360 ///
361 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
362 #[cfg(feature = "e2e-encryption")]
363 pub(crate) enable_share_history_on_invite: bool,
364
365 /// Data related to the [`SendQueue`].
366 ///
367 /// [`SendQueue`]: crate::send_queue::SendQueue
368 pub(crate) send_queue_data: Arc<SendQueueData>,
369
370 /// The `max_upload_size` value of the homeserver, it contains the max
371 /// request size you can send.
372 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
373
374 /// The entry point to get the [`LatestEvent`] of rooms and threads.
375 ///
376 /// [`LatestEvent`]: crate::latest_event::LatestEvent
377 latest_events: OnceCell<LatestEvents>,
378
379 /// Service handling the catching up of thread subscriptions in the
380 /// background.
381 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
382
383 #[cfg(feature = "experimental-search")]
384 /// Handler for [`RoomIndex`]'s of each room
385 search_index: SearchIndex,
386}
387
388impl ClientInner {
389 /// Create a new `ClientInner`.
390 ///
391 /// All the fields passed as parameters here are those that must be cloned
392 /// upon instantiation of a sub-client, e.g. a client specialized for
393 /// notifications.
394 #[allow(clippy::too_many_arguments)]
395 async fn new(
396 auth_ctx: Arc<AuthCtx>,
397 server: Option<Url>,
398 homeserver: Url,
399 sliding_sync_version: SlidingSyncVersion,
400 http_client: HttpClient,
401 base_client: BaseClient,
402 supported_versions: CachedValue<SupportedVersions>,
403 well_known: CachedValue<Option<WellKnownResponse>>,
404 respect_login_well_known: bool,
405 event_cache: OnceCell<EventCache>,
406 send_queue: Arc<SendQueueData>,
407 latest_events: OnceCell<LatestEvents>,
408 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
409 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
410 cross_process_store_locks_holder_name: String,
411 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
412 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
413 ) -> Arc<Self> {
414 let caches = ClientCaches {
415 supported_versions: supported_versions.into(),
416 well_known: well_known.into(),
417 server_metadata: Mutex::new(TtlCache::new()),
418 };
419
420 let client = Self {
421 server,
422 homeserver: StdRwLock::new(homeserver),
423 auth_ctx,
424 sliding_sync_version: StdRwLock::new(sliding_sync_version),
425 http_client,
426 base_client,
427 caches,
428 locks: Default::default(),
429 cross_process_store_locks_holder_name,
430 typing_notice_times: Default::default(),
431 event_handlers: Default::default(),
432 notification_handlers: Default::default(),
433 room_update_channels: Default::default(),
434 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
435 // ballast for all observers to catch up.
436 room_updates_sender: broadcast::Sender::new(32),
437 respect_login_well_known,
438 sync_beat: event_listener::Event::new(),
439 event_cache,
440 send_queue_data: send_queue,
441 latest_events,
442 #[cfg(feature = "e2e-encryption")]
443 e2ee: EncryptionData::new(encryption_settings),
444 #[cfg(feature = "e2e-encryption")]
445 verification_state: SharedObservable::new(VerificationState::Unknown),
446 #[cfg(feature = "e2e-encryption")]
447 enable_share_history_on_invite,
448 server_max_upload_size: Mutex::new(OnceCell::new()),
449 #[cfg(feature = "experimental-search")]
450 search_index: search_index_handler,
451 thread_subscription_catchup,
452 };
453
454 #[allow(clippy::let_and_return)]
455 let client = Arc::new(client);
456
457 #[cfg(feature = "e2e-encryption")]
458 client.e2ee.initialize_tasks(&client);
459
460 let _ = client
461 .event_cache
462 .get_or_init(|| async {
463 EventCache::new(
464 WeakClient::from_inner(&client),
465 client.base_client.event_cache_store().clone(),
466 )
467 })
468 .await;
469
470 let _ = client
471 .thread_subscription_catchup
472 .get_or_init(|| async {
473 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
474 })
475 .await;
476
477 client
478 }
479}
480
481#[cfg(not(tarpaulin_include))]
482impl Debug for Client {
483 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
484 write!(fmt, "Client")
485 }
486}
487
488impl Client {
489 /// Create a new [`Client`] that will use the given homeserver.
490 ///
491 /// # Arguments
492 ///
493 /// * `homeserver_url` - The homeserver that the client should connect to.
494 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
495 Self::builder().homeserver_url(homeserver_url).build().await
496 }
497
498 /// Returns a subscriber that publishes an event every time the ignore user
499 /// list changes.
500 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
501 self.inner.base_client.subscribe_to_ignore_user_list_changes()
502 }
503
504 /// Create a new [`ClientBuilder`].
505 pub fn builder() -> ClientBuilder {
506 ClientBuilder::new()
507 }
508
509 pub(crate) fn base_client(&self) -> &BaseClient {
510 &self.inner.base_client
511 }
512
513 /// The underlying HTTP client.
514 pub fn http_client(&self) -> &reqwest::Client {
515 &self.inner.http_client.inner
516 }
517
518 pub(crate) fn locks(&self) -> &ClientLocks {
519 &self.inner.locks
520 }
521
522 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
523 &self.inner.auth_ctx
524 }
525
526 /// The cross-process store locks holder name.
527 ///
528 /// The SDK provides cross-process store locks (see
529 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
530 /// `holder_name` is the value used for all cross-process store locks
531 /// used by this `Client`.
532 pub fn cross_process_store_locks_holder_name(&self) -> &str {
533 &self.inner.cross_process_store_locks_holder_name
534 }
535
536 /// Change the homeserver URL used by this client.
537 ///
538 /// # Arguments
539 ///
540 /// * `homeserver_url` - The new URL to use.
541 fn set_homeserver(&self, homeserver_url: Url) {
542 *self.inner.homeserver.write().unwrap() = homeserver_url;
543 }
544
545 /// Change to a different homeserver and re-resolve well-known.
546 #[cfg(feature = "e2e-encryption")]
547 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
548 &self,
549 homeserver_url: Url,
550 ) -> Result<()> {
551 self.set_homeserver(homeserver_url);
552 self.reset_well_known().await?;
553 if let Some(well_known) = self.load_or_fetch_well_known().await? {
554 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
555 }
556 Ok(())
557 }
558
559 /// Get the capabilities of the homeserver.
560 ///
561 /// This method should be used to check what features are supported by the
562 /// homeserver.
563 ///
564 /// # Examples
565 ///
566 /// ```no_run
567 /// # use matrix_sdk::Client;
568 /// # use url::Url;
569 /// # async {
570 /// # let homeserver = Url::parse("http://example.com")?;
571 /// let client = Client::new(homeserver).await?;
572 ///
573 /// let capabilities = client.get_capabilities().await?;
574 ///
575 /// if capabilities.change_password.enabled {
576 /// // Change password
577 /// }
578 /// # anyhow::Ok(()) };
579 /// ```
580 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
581 let res = self.send(get_capabilities::v3::Request::new()).await?;
582 Ok(res.capabilities)
583 }
584
585 /// Get the server vendor information from the federation API.
586 ///
587 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
588 /// both the server's software name and version.
589 ///
590 /// # Examples
591 ///
592 /// ```no_run
593 /// # use matrix_sdk::Client;
594 /// # use url::Url;
595 /// # async {
596 /// # let homeserver = Url::parse("http://example.com")?;
597 /// let client = Client::new(homeserver).await?;
598 ///
599 /// let server_info = client.server_vendor_info(None).await?;
600 /// println!(
601 /// "Server: {}, Version: {}",
602 /// server_info.server_name, server_info.version
603 /// );
604 /// # anyhow::Ok(()) };
605 /// ```
606 #[cfg(feature = "federation-api")]
607 pub async fn server_vendor_info(
608 &self,
609 request_config: Option<RequestConfig>,
610 ) -> HttpResult<ServerVendorInfo> {
611 use ruma::api::federation::discovery::get_server_version;
612
613 let res = self
614 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
615 .await?;
616
617 // Extract server info, using defaults if fields are missing.
618 let server = res.server.unwrap_or_default();
619 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
620 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
621
622 Ok(ServerVendorInfo { server_name: server_name_str, version })
623 }
624
625 /// Get a copy of the default request config.
626 ///
627 /// The default request config is what's used when sending requests if no
628 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
629 /// function with such a parameter.
630 ///
631 /// If the default request config was not customized through
632 /// [`ClientBuilder`] when creating this `Client`, the returned value will
633 /// be equivalent to [`RequestConfig::default()`].
634 pub fn request_config(&self) -> RequestConfig {
635 self.inner.http_client.request_config
636 }
637
638 /// Check whether the client has been activated.
639 ///
640 /// A client is considered active when:
641 ///
642 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
643 /// is logged in,
644 /// 2. Has loaded cached data from storage,
645 /// 3. If encryption is enabled, it also initialized or restored its
646 /// `OlmMachine`.
647 pub fn is_active(&self) -> bool {
648 self.inner.base_client.is_active()
649 }
650
651 /// The server used by the client.
652 ///
653 /// See `Self::server` to learn more.
654 pub fn server(&self) -> Option<&Url> {
655 self.inner.server.as_ref()
656 }
657
658 /// The homeserver of the client.
659 pub fn homeserver(&self) -> Url {
660 self.inner.homeserver.read().unwrap().clone()
661 }
662
663 /// Get the sliding sync version.
664 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
665 self.inner.sliding_sync_version.read().unwrap().clone()
666 }
667
668 /// Override the sliding sync version.
669 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
670 let mut lock = self.inner.sliding_sync_version.write().unwrap();
671 *lock = version;
672 }
673
674 /// Get the Matrix user session meta information.
675 ///
676 /// If the client is currently logged in, this will return a
677 /// [`SessionMeta`] object which contains the user ID and device ID.
678 /// Otherwise it returns `None`.
679 pub fn session_meta(&self) -> Option<&SessionMeta> {
680 self.base_client().session_meta()
681 }
682
683 /// Returns a receiver that gets events for each room info update. To watch
684 /// for new events, use `receiver.resubscribe()`.
685 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
686 self.base_client().room_info_notable_update_receiver()
687 }
688
689 /// Performs a search for users.
690 /// The search is performed case-insensitively on user IDs and display names
691 ///
692 /// # Arguments
693 ///
694 /// * `search_term` - The search term for the search
695 /// * `limit` - The maximum number of results to return. Defaults to 10.
696 ///
697 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
698 pub async fn search_users(
699 &self,
700 search_term: &str,
701 limit: u64,
702 ) -> HttpResult<search_users::v3::Response> {
703 let mut request = search_users::v3::Request::new(search_term.to_owned());
704
705 if let Some(limit) = UInt::new(limit) {
706 request.limit = limit;
707 }
708
709 self.send(request).await
710 }
711
712 /// Get the user id of the current owner of the client.
713 pub fn user_id(&self) -> Option<&UserId> {
714 self.session_meta().map(|s| s.user_id.as_ref())
715 }
716
717 /// Get the device ID that identifies the current session.
718 pub fn device_id(&self) -> Option<&DeviceId> {
719 self.session_meta().map(|s| s.device_id.as_ref())
720 }
721
722 /// Get the current access token for this session.
723 ///
724 /// Will be `None` if the client has not been logged in.
725 pub fn access_token(&self) -> Option<String> {
726 self.auth_ctx().access_token()
727 }
728
729 /// Get the current tokens for this session.
730 ///
731 /// To be notified of changes in the session tokens, use
732 /// [`Client::subscribe_to_session_changes()`] or
733 /// [`Client::set_session_callbacks()`].
734 ///
735 /// Returns `None` if the client has not been logged in.
736 pub fn session_tokens(&self) -> Option<SessionTokens> {
737 self.auth_ctx().session_tokens()
738 }
739
740 /// Access the authentication API used to log in this client.
741 ///
742 /// Will be `None` if the client has not been logged in.
743 pub fn auth_api(&self) -> Option<AuthApi> {
744 match self.auth_ctx().auth_data.get()? {
745 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
746 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
747 }
748 }
749
750 /// Get the whole session info of this client.
751 ///
752 /// Will be `None` if the client has not been logged in.
753 ///
754 /// Can be used with [`Client::restore_session`] to restore a previously
755 /// logged-in session.
756 pub fn session(&self) -> Option<AuthSession> {
757 match self.auth_api()? {
758 AuthApi::Matrix(api) => api.session().map(Into::into),
759 AuthApi::OAuth(api) => api.full_session().map(Into::into),
760 }
761 }
762
763 /// Get a reference to the state store.
764 pub fn state_store(&self) -> &DynStateStore {
765 self.base_client().state_store()
766 }
767
768 /// Get a reference to the event cache store.
769 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
770 self.base_client().event_cache_store()
771 }
772
773 /// Get a reference to the media store.
774 pub fn media_store(&self) -> &MediaStoreLock {
775 self.base_client().media_store()
776 }
777
778 /// Access the native Matrix authentication API with this client.
779 pub fn matrix_auth(&self) -> MatrixAuth {
780 MatrixAuth::new(self.clone())
781 }
782
783 /// Get the account of the current owner of the client.
784 pub fn account(&self) -> Account {
785 Account::new(self.clone())
786 }
787
788 /// Get the encryption manager of the client.
789 #[cfg(feature = "e2e-encryption")]
790 pub fn encryption(&self) -> Encryption {
791 Encryption::new(self.clone())
792 }
793
794 /// Get the media manager of the client.
795 pub fn media(&self) -> Media {
796 Media::new(self.clone())
797 }
798
799 /// Get the pusher manager of the client.
800 pub fn pusher(&self) -> Pusher {
801 Pusher::new(self.clone())
802 }
803
804 /// Access the OAuth 2.0 API of the client.
805 pub fn oauth(&self) -> OAuth {
806 OAuth::new(self.clone())
807 }
808
809 /// Register a handler for a specific event type.
810 ///
811 /// The handler is a function or closure with one or more arguments. The
812 /// first argument is the event itself. All additional arguments are
813 /// "context" arguments: They have to implement [`EventHandlerContext`].
814 /// This trait is named that way because most of the types implementing it
815 /// give additional context about an event: The room it was in, its raw form
816 /// and other similar things. As two exceptions to this,
817 /// [`Client`] and [`EventHandlerHandle`] also implement the
818 /// `EventHandlerContext` trait so you don't have to clone your client
819 /// into the event handler manually and a handler can decide to remove
820 /// itself.
821 ///
822 /// Some context arguments are not universally applicable. A context
823 /// argument that isn't available for the given event type will result in
824 /// the event handler being skipped and an error being logged. The following
825 /// context argument types are only available for a subset of event types:
826 ///
827 /// * [`Room`] is only available for room-specific events, i.e. not for
828 /// events like global account data events or presence events.
829 ///
830 /// You can provide custom context via
831 /// [`add_event_handler_context`](Client::add_event_handler_context) and
832 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
833 /// into the event handler.
834 ///
835 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
836 ///
837 /// # Examples
838 ///
839 /// ```no_run
840 /// use matrix_sdk::{
841 /// deserialized_responses::EncryptionInfo,
842 /// event_handler::Ctx,
843 /// ruma::{
844 /// events::{
845 /// macros::EventContent,
846 /// push_rules::PushRulesEvent,
847 /// room::{
848 /// message::SyncRoomMessageEvent,
849 /// topic::SyncRoomTopicEvent,
850 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
851 /// },
852 /// },
853 /// push::Action,
854 /// Int, MilliSecondsSinceUnixEpoch,
855 /// },
856 /// Client, Room,
857 /// };
858 /// use serde::{Deserialize, Serialize};
859 ///
860 /// # async fn example(client: Client) {
861 /// client.add_event_handler(
862 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
863 /// // Common usage: Room event plus room and client.
864 /// },
865 /// );
866 /// client.add_event_handler(
867 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
868 /// async move {
869 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
870 /// // unencrypted events and events that were decrypted by the SDK.
871 /// }
872 /// },
873 /// );
874 /// client.add_event_handler(
875 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
876 /// async move {
877 /// // A `Vec<Action>` parameter allows you to know which push actions
878 /// // are applicable for an event. For example, an event with
879 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
880 /// // in the timeline.
881 /// }
882 /// },
883 /// );
884 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
885 /// // You can omit any or all arguments after the first.
886 /// });
887 ///
888 /// // Registering a temporary event handler:
889 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
890 /// /* Event handler */
891 /// });
892 /// client.remove_event_handler(handle);
893 ///
894 /// // Registering custom event handler context:
895 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
896 /// struct MyContext {
897 /// number: usize,
898 /// }
899 /// client.add_event_handler_context(MyContext { number: 5 });
900 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
901 /// // Use the context
902 /// });
903 ///
904 /// // This will handle membership events in joined rooms. Invites are special, see below.
905 /// client.add_event_handler(
906 /// |ev: SyncRoomMemberEvent| async move {},
907 /// );
908 ///
909 /// // To handle state events in invited rooms (including invite membership events),
910 /// // `StrippedRoomMemberEvent` should be used.
911 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
912 /// client.add_event_handler(
913 /// |ev: StrippedRoomMemberEvent| async move {},
914 /// );
915 ///
916 /// // Custom events work exactly the same way, you just need to declare
917 /// // the content struct and use the EventContent derive macro on it.
918 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
919 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
920 /// struct TokenEventContent {
921 /// token: String,
922 /// #[serde(rename = "exp")]
923 /// expires_at: MilliSecondsSinceUnixEpoch,
924 /// }
925 ///
926 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
927 /// todo!("Display the token");
928 /// });
929 ///
930 /// // Event handler closures can also capture local variables.
931 /// // Make sure they are cheap to clone though, because they will be cloned
932 /// // every time the closure is called.
933 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
934 ///
935 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
936 /// println!("Calling the handler with identifier {data}");
937 /// });
938 /// # }
939 /// ```
940 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
941 where
942 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
943 H: EventHandler<Ev, Ctx>,
944 {
945 self.add_event_handler_impl(handler, None)
946 }
947
948 /// Register a handler for a specific room, and event type.
949 ///
950 /// This method works the same way as
951 /// [`add_event_handler`][Self::add_event_handler], except that the handler
952 /// will only be called for events in the room with the specified ID. See
953 /// that method for more details on event handler functions.
954 ///
955 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
956 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
957 /// your use case.
958 pub fn add_room_event_handler<Ev, Ctx, H>(
959 &self,
960 room_id: &RoomId,
961 handler: H,
962 ) -> EventHandlerHandle
963 where
964 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
965 H: EventHandler<Ev, Ctx>,
966 {
967 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
968 }
969
970 /// Observe a specific event type.
971 ///
972 /// `Ev` represents the kind of event that will be observed. `Ctx`
973 /// represents the context that will come with the event. It relies on the
974 /// same mechanism as [`Client::add_event_handler`]. The main difference is
975 /// that it returns an [`ObservableEventHandler`] and doesn't require a
976 /// user-defined closure. It is possible to subscribe to the
977 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
978 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
979 /// Ctx)`.
980 ///
981 /// Be careful that only the most recent value can be observed. Subscribers
982 /// are notified when a new value is sent, but there is no guarantee
983 /// that they will see all values.
984 ///
985 /// # Example
986 ///
987 /// Let's see a classical usage:
988 ///
989 /// ```
990 /// use futures_util::StreamExt as _;
991 /// use matrix_sdk::{
992 /// Client, Room,
993 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
994 /// };
995 ///
996 /// # async fn example(client: Client) -> Option<()> {
997 /// let observer =
998 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
999 ///
1000 /// let mut subscriber = observer.subscribe();
1001 ///
1002 /// let (event, (room, push_actions)) = subscriber.next().await?;
1003 /// # Some(())
1004 /// # }
1005 /// ```
1006 ///
1007 /// Now let's see how to get several contexts that can be useful for you:
1008 ///
1009 /// ```
1010 /// use matrix_sdk::{
1011 /// Client, Room,
1012 /// deserialized_responses::EncryptionInfo,
1013 /// ruma::{
1014 /// events::room::{
1015 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1016 /// },
1017 /// push::Action,
1018 /// },
1019 /// };
1020 ///
1021 /// # async fn example(client: Client) {
1022 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1023 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1024 ///
1025 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1026 /// // to distinguish between unencrypted events and events that were decrypted
1027 /// // by the SDK.
1028 /// let _ = client
1029 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1030 /// );
1031 ///
1032 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1033 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1034 /// // should be highlighted in the timeline.
1035 /// let _ =
1036 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1037 ///
1038 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1039 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1040 /// # }
1041 /// ```
1042 ///
1043 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1044 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1045 where
1046 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1047 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1048 {
1049 self.observe_room_events_impl(None)
1050 }
1051
1052 /// Observe a specific room, and event type.
1053 ///
1054 /// This method works the same way as [`Client::observe_events`], except
1055 /// that the observability will only be applied for events in the room with
1056 /// the specified ID. See that method for more details.
1057 ///
1058 /// Be careful that only the most recent value can be observed. Subscribers
1059 /// are notified when a new value is sent, but there is no guarantee
1060 /// that they will see all values.
1061 pub fn observe_room_events<Ev, Ctx>(
1062 &self,
1063 room_id: &RoomId,
1064 ) -> ObservableEventHandler<(Ev, Ctx)>
1065 where
1066 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1067 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1068 {
1069 self.observe_room_events_impl(Some(room_id.to_owned()))
1070 }
1071
1072 /// Shared implementation for `Client::observe_events` and
1073 /// `Client::observe_room_events`.
1074 fn observe_room_events_impl<Ev, Ctx>(
1075 &self,
1076 room_id: Option<OwnedRoomId>,
1077 ) -> ObservableEventHandler<(Ev, Ctx)>
1078 where
1079 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1080 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1081 {
1082 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1083 // new value.
1084 let shared_observable = SharedObservable::new(None);
1085
1086 ObservableEventHandler::new(
1087 shared_observable.clone(),
1088 self.event_handler_drop_guard(self.add_event_handler_impl(
1089 move |event: Ev, context: Ctx| {
1090 shared_observable.set(Some((event, context)));
1091
1092 ready(())
1093 },
1094 room_id,
1095 )),
1096 )
1097 }
1098
1099 /// Remove the event handler associated with the handle.
1100 ///
1101 /// Note that you **must not** call `remove_event_handler` from the
1102 /// non-async part of an event handler, that is:
1103 ///
1104 /// ```ignore
1105 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1106 /// // ⚠ this will cause a deadlock ⚠
1107 /// client.remove_event_handler(handle);
1108 ///
1109 /// async move {
1110 /// // removing the event handler here is fine
1111 /// client.remove_event_handler(handle);
1112 /// }
1113 /// })
1114 /// ```
1115 ///
1116 /// Note also that handlers that remove themselves will still execute with
1117 /// events received in the same sync cycle.
1118 ///
1119 /// # Arguments
1120 ///
1121 /// `handle` - The [`EventHandlerHandle`] that is returned when
1122 /// registering the event handler with [`Client::add_event_handler`].
1123 ///
1124 /// # Examples
1125 ///
1126 /// ```no_run
1127 /// # use url::Url;
1128 /// # use tokio::sync::mpsc;
1129 /// #
1130 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1131 /// #
1132 /// use matrix_sdk::{
1133 /// Client, event_handler::EventHandlerHandle,
1134 /// ruma::events::room::member::SyncRoomMemberEvent,
1135 /// };
1136 /// #
1137 /// # futures_executor::block_on(async {
1138 /// # let client = matrix_sdk::Client::builder()
1139 /// # .homeserver_url(homeserver)
1140 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1141 /// # .build()
1142 /// # .await
1143 /// # .unwrap();
1144 ///
1145 /// client.add_event_handler(
1146 /// |ev: SyncRoomMemberEvent,
1147 /// client: Client,
1148 /// handle: EventHandlerHandle| async move {
1149 /// // Common usage: Check arriving Event is the expected one
1150 /// println!("Expected RoomMemberEvent received!");
1151 /// client.remove_event_handler(handle);
1152 /// },
1153 /// );
1154 /// # });
1155 /// ```
1156 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1157 self.inner.event_handlers.remove(handle);
1158 }
1159
1160 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1161 /// the given handle.
1162 ///
1163 /// When the returned value is dropped, the event handler will be removed.
1164 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1165 EventHandlerDropGuard::new(handle, self.clone())
1166 }
1167
1168 /// Add an arbitrary value for use as event handler context.
1169 ///
1170 /// The value can be obtained in an event handler by adding an argument of
1171 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1172 ///
1173 /// If a value of the same type has been added before, it will be
1174 /// overwritten.
1175 ///
1176 /// # Examples
1177 ///
1178 /// ```no_run
1179 /// use matrix_sdk::{
1180 /// Room, event_handler::Ctx,
1181 /// ruma::events::room::message::SyncRoomMessageEvent,
1182 /// };
1183 /// # #[derive(Clone)]
1184 /// # struct SomeType;
1185 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1186 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1187 /// # futures_executor::block_on(async {
1188 /// # let client = matrix_sdk::Client::builder()
1189 /// # .homeserver_url(homeserver)
1190 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1191 /// # .build()
1192 /// # .await
1193 /// # .unwrap();
1194 ///
1195 /// // Handle used to send messages to the UI part of the app
1196 /// let my_gui_handle: SomeType = obtain_gui_handle();
1197 ///
1198 /// client.add_event_handler_context(my_gui_handle.clone());
1199 /// client.add_event_handler(
1200 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1201 /// async move {
1202 /// // gui_handle.send(DisplayMessage { message: ev });
1203 /// }
1204 /// },
1205 /// );
1206 /// # });
1207 /// ```
1208 pub fn add_event_handler_context<T>(&self, ctx: T)
1209 where
1210 T: Clone + Send + Sync + 'static,
1211 {
1212 self.inner.event_handlers.add_context(ctx);
1213 }
1214
1215 /// Register a handler for a notification.
1216 ///
1217 /// Similar to [`Client::add_event_handler`], but only allows functions
1218 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1219 /// [`Client`] for now.
1220 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1221 where
1222 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1223 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1224 {
1225 self.inner.notification_handlers.write().await.push(Box::new(
1226 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1227 ));
1228
1229 self
1230 }
1231
1232 /// Subscribe to all updates for the room with the given ID.
1233 ///
1234 /// The returned receiver will receive a new message for each sync response
1235 /// that contains updates for that room.
1236 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1237 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1238 btree_map::Entry::Vacant(entry) => {
1239 let (tx, rx) = broadcast::channel(8);
1240 entry.insert(tx);
1241 rx
1242 }
1243 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1244 }
1245 }
1246
1247 /// Subscribe to all updates to all rooms, whenever any has been received in
1248 /// a sync response.
1249 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1250 self.inner.room_updates_sender.subscribe()
1251 }
1252
1253 pub(crate) async fn notification_handlers(
1254 &self,
1255 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1256 self.inner.notification_handlers.read().await
1257 }
1258
1259 /// Get all the rooms the client knows about.
1260 ///
1261 /// This will return the list of joined, invited, and left rooms.
1262 pub fn rooms(&self) -> Vec<Room> {
1263 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1264 }
1265
1266 /// Get all the rooms the client knows about, filtered by room state.
1267 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1268 self.base_client()
1269 .rooms_filtered(filter)
1270 .into_iter()
1271 .map(|room| Room::new(self.clone(), room))
1272 .collect()
1273 }
1274
1275 /// Get a stream of all the rooms, in addition to the existing rooms.
1276 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1277 let (rooms, stream) = self.base_client().rooms_stream();
1278
1279 let map_room = |room| Room::new(self.clone(), room);
1280
1281 (
1282 rooms.into_iter().map(map_room).collect(),
1283 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1284 )
1285 }
1286
1287 /// Returns the joined rooms this client knows about.
1288 pub fn joined_rooms(&self) -> Vec<Room> {
1289 self.rooms_filtered(RoomStateFilter::JOINED)
1290 }
1291
1292 /// Returns the invited rooms this client knows about.
1293 pub fn invited_rooms(&self) -> Vec<Room> {
1294 self.rooms_filtered(RoomStateFilter::INVITED)
1295 }
1296
1297 /// Returns the left rooms this client knows about.
1298 pub fn left_rooms(&self) -> Vec<Room> {
1299 self.rooms_filtered(RoomStateFilter::LEFT)
1300 }
1301
1302 /// Returns the joined space rooms this client knows about.
1303 pub fn joined_space_rooms(&self) -> Vec<Room> {
1304 self.base_client()
1305 .rooms_filtered(RoomStateFilter::JOINED)
1306 .into_iter()
1307 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1308 .collect()
1309 }
1310
1311 /// Get a room with the given room id.
1312 ///
1313 /// # Arguments
1314 ///
1315 /// `room_id` - The unique id of the room that should be fetched.
1316 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1317 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1318 }
1319
1320 /// Gets the preview of a room, whether the current user has joined it or
1321 /// not.
1322 pub async fn get_room_preview(
1323 &self,
1324 room_or_alias_id: &RoomOrAliasId,
1325 via: Vec<OwnedServerName>,
1326 ) -> Result<RoomPreview> {
1327 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1328 Ok(room_id) => room_id.to_owned(),
1329 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1330 };
1331
1332 if let Some(room) = self.get_room(&room_id) {
1333 // The cached data can only be trusted if the room state is joined or
1334 // banned: for invite and knock rooms, no updates will be received
1335 // for the rooms after the invite/knock action took place so we may
1336 // have very out to date data for important fields such as
1337 // `join_rule`. For left rooms, the homeserver should return the latest info.
1338 match room.state() {
1339 RoomState::Joined | RoomState::Banned => {
1340 return Ok(RoomPreview::from_known_room(&room).await);
1341 }
1342 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1343 }
1344 }
1345
1346 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1347 }
1348
1349 /// Resolve a room alias to a room id and a list of servers which know
1350 /// about it.
1351 ///
1352 /// # Arguments
1353 ///
1354 /// `room_alias` - The room alias to be resolved.
1355 pub async fn resolve_room_alias(
1356 &self,
1357 room_alias: &RoomAliasId,
1358 ) -> HttpResult<get_alias::v3::Response> {
1359 let request = get_alias::v3::Request::new(room_alias.to_owned());
1360 self.send(request).await
1361 }
1362
1363 /// Checks if a room alias is not in use yet.
1364 ///
1365 /// Returns:
1366 /// - `Ok(true)` if the room alias is available.
1367 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1368 /// status code).
1369 /// - An `Err` otherwise.
1370 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1371 match self.resolve_room_alias(alias).await {
1372 // The room alias was resolved, so it's already in use.
1373 Ok(_) => Ok(false),
1374 Err(error) => {
1375 match error.client_api_error_kind() {
1376 // The room alias wasn't found, so it's available.
1377 Some(ErrorKind::NotFound) => Ok(true),
1378 _ => Err(error),
1379 }
1380 }
1381 }
1382 }
1383
1384 /// Adds a new room alias associated with a room to the room directory.
1385 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1386 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1387 self.send(request).await?;
1388 Ok(())
1389 }
1390
1391 /// Removes a room alias from the room directory.
1392 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1393 let request = delete_alias::v3::Request::new(alias.to_owned());
1394 self.send(request).await?;
1395 Ok(())
1396 }
1397
1398 /// Update the homeserver from the login response well-known if needed.
1399 ///
1400 /// # Arguments
1401 ///
1402 /// * `login_well_known` - The `well_known` field from a successful login
1403 /// response.
1404 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1405 if self.inner.respect_login_well_known
1406 && let Some(well_known) = login_well_known
1407 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1408 {
1409 self.set_homeserver(homeserver);
1410 }
1411 }
1412
1413 /// Similar to [`Client::restore_session_with`], with
1414 /// [`RoomLoadSettings::default()`].
1415 ///
1416 /// # Panics
1417 ///
1418 /// Panics if a session was already restored or logged in.
1419 #[instrument(skip_all)]
1420 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1421 self.restore_session_with(session, RoomLoadSettings::default()).await
1422 }
1423
1424 /// Restore a session previously logged-in using one of the available
1425 /// authentication APIs. The number of rooms to restore is controlled by
1426 /// [`RoomLoadSettings`].
1427 ///
1428 /// See the documentation of the corresponding authentication API's
1429 /// `restore_session` method for more information.
1430 ///
1431 /// # Panics
1432 ///
1433 /// Panics if a session was already restored or logged in.
1434 #[instrument(skip_all)]
1435 pub async fn restore_session_with(
1436 &self,
1437 session: impl Into<AuthSession>,
1438 room_load_settings: RoomLoadSettings,
1439 ) -> Result<()> {
1440 let session = session.into();
1441 match session {
1442 AuthSession::Matrix(session) => {
1443 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1444 }
1445 AuthSession::OAuth(session) => {
1446 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1447 }
1448 }
1449 }
1450
1451 /// Refresh the access token using the authentication API used to log into
1452 /// this session.
1453 ///
1454 /// See the documentation of the authentication API's `refresh_access_token`
1455 /// method for more information.
1456 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1457 let Some(auth_api) = self.auth_api() else {
1458 return Err(RefreshTokenError::RefreshTokenRequired);
1459 };
1460
1461 match auth_api {
1462 AuthApi::Matrix(api) => {
1463 trace!("Token refresh: Using the homeserver.");
1464 Box::pin(api.refresh_access_token()).await?;
1465 }
1466 AuthApi::OAuth(api) => {
1467 trace!("Token refresh: Using OAuth 2.0.");
1468 Box::pin(api.refresh_access_token()).await?;
1469 }
1470 }
1471
1472 Ok(())
1473 }
1474
1475 /// Log out the current session using the proper authentication API.
1476 ///
1477 /// # Errors
1478 ///
1479 /// Returns an error if the session is not authenticated or if an error
1480 /// occurred while making the request to the server.
1481 pub async fn logout(&self) -> Result<(), Error> {
1482 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1483 match auth_api {
1484 AuthApi::Matrix(matrix_auth) => {
1485 matrix_auth.logout().await?;
1486 Ok(())
1487 }
1488 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1489 }
1490 }
1491
1492 /// Get or upload a sync filter.
1493 ///
1494 /// This method will either get a filter ID from the store or upload the
1495 /// filter definition to the homeserver and return the new filter ID.
1496 ///
1497 /// # Arguments
1498 ///
1499 /// * `filter_name` - The unique name of the filter, this name will be used
1500 /// locally to store and identify the filter ID returned by the server.
1501 ///
1502 /// * `definition` - The filter definition that should be uploaded to the
1503 /// server if no filter ID can be found in the store.
1504 ///
1505 /// # Examples
1506 ///
1507 /// ```no_run
1508 /// # use matrix_sdk::{
1509 /// # Client, config::SyncSettings,
1510 /// # ruma::api::client::{
1511 /// # filter::{
1512 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1513 /// # },
1514 /// # sync::sync_events::v3::Filter,
1515 /// # }
1516 /// # };
1517 /// # use url::Url;
1518 /// # async {
1519 /// # let homeserver = Url::parse("http://example.com").unwrap();
1520 /// # let client = Client::new(homeserver).await.unwrap();
1521 /// let mut filter = FilterDefinition::default();
1522 ///
1523 /// // Let's enable member lazy loading.
1524 /// filter.room.state.lazy_load_options =
1525 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1526 ///
1527 /// let filter_id = client
1528 /// .get_or_upload_filter("sync", filter)
1529 /// .await
1530 /// .unwrap();
1531 ///
1532 /// let sync_settings = SyncSettings::new()
1533 /// .filter(Filter::FilterId(filter_id));
1534 ///
1535 /// let response = client.sync_once(sync_settings).await.unwrap();
1536 /// # };
1537 #[instrument(skip(self, definition))]
1538 pub async fn get_or_upload_filter(
1539 &self,
1540 filter_name: &str,
1541 definition: FilterDefinition,
1542 ) -> Result<String> {
1543 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1544 debug!("Found filter locally");
1545 Ok(filter)
1546 } else {
1547 debug!("Didn't find filter locally");
1548 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1549 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1550 let response = self.send(request).await?;
1551
1552 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1553
1554 Ok(response.filter_id)
1555 }
1556 }
1557
1558 /// Prepare to join a room by ID, by getting the current details about it
1559 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1560 let room = self.get_room(room_id)?;
1561
1562 let inviter = match room.invite_details().await {
1563 Ok(details) => details.inviter,
1564 Err(Error::WrongRoomState(_)) => None,
1565 Err(e) => {
1566 warn!("Error fetching invite details for room: {e:?}");
1567 None
1568 }
1569 };
1570
1571 Some(PreJoinRoomInfo { inviter })
1572 }
1573
1574 /// Finish joining a room.
1575 ///
1576 /// If the room was an invite that should be marked as a DM, will include it
1577 /// in the DM event after creating the joined room.
1578 ///
1579 /// If encrypted history sharing is enabled, will check to see if we have a
1580 /// key bundle, and import it if so.
1581 ///
1582 /// # Arguments
1583 ///
1584 /// * `room_id` - The `RoomId` of the room that was joined.
1585 /// * `pre_join_room_info` - Information about the room before we joined.
1586 async fn finish_join_room(
1587 &self,
1588 room_id: &RoomId,
1589 pre_join_room_info: Option<PreJoinRoomInfo>,
1590 ) -> Result<Room> {
1591 info!(?room_id, ?pre_join_room_info, "Completing room join");
1592 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1593 room.state() == RoomState::Invited
1594 && room.is_direct().await.unwrap_or_else(|e| {
1595 warn!(%room_id, "is_direct() failed: {e}");
1596 false
1597 })
1598 } else {
1599 false
1600 };
1601
1602 let base_room = self
1603 .base_client()
1604 .room_joined(
1605 room_id,
1606 pre_join_room_info
1607 .as_ref()
1608 .and_then(|info| info.inviter.as_ref())
1609 .map(|i| i.user_id().to_owned()),
1610 )
1611 .await?;
1612 let room = Room::new(self.clone(), base_room);
1613
1614 if mark_as_dm {
1615 room.set_is_direct(true).await?;
1616 }
1617
1618 #[cfg(feature = "e2e-encryption")]
1619 if self.inner.enable_share_history_on_invite
1620 && let Some(inviter) =
1621 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1622 {
1623 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1624 .await?;
1625 }
1626
1627 // Suppress "unused variable" and "unused field" lints
1628 #[cfg(not(feature = "e2e-encryption"))]
1629 let _ = pre_join_room_info.map(|i| i.inviter);
1630
1631 Ok(room)
1632 }
1633
1634 /// Join a room by `RoomId`.
1635 ///
1636 /// Returns the `Room` in the joined state.
1637 ///
1638 /// # Arguments
1639 ///
1640 /// * `room_id` - The `RoomId` of the room to be joined.
1641 #[instrument(skip(self))]
1642 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1643 // See who invited us to this room, if anyone. Note we have to do this before
1644 // making the `/join` request, otherwise we could race against the sync.
1645 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1646
1647 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1648 let response = self.send(request).await?;
1649 self.finish_join_room(&response.room_id, pre_join_info).await
1650 }
1651
1652 /// Join a room by `RoomOrAliasId`.
1653 ///
1654 /// Returns the `Room` in the joined state.
1655 ///
1656 /// # Arguments
1657 ///
1658 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1659 /// alias looks like `#name:example.com`.
1660 /// * `server_names` - The server names to be used for resolving the alias,
1661 /// if needs be.
1662 #[instrument(skip(self))]
1663 pub async fn join_room_by_id_or_alias(
1664 &self,
1665 alias: &RoomOrAliasId,
1666 server_names: &[OwnedServerName],
1667 ) -> Result<Room> {
1668 let pre_join_info = {
1669 match alias.try_into() {
1670 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1671 Err(_) => {
1672 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1673 // responding to an invitation to the room, and therefore don't need to handle
1674 // things that happen as a result of invites.
1675 None
1676 }
1677 }
1678 };
1679 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1680 via: server_names.to_owned(),
1681 });
1682 let response = self.send(request).await?;
1683 self.finish_join_room(&response.room_id, pre_join_info).await
1684 }
1685
1686 /// Search the homeserver's directory of public rooms.
1687 ///
1688 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1689 /// a `get_public_rooms::Response`.
1690 ///
1691 /// # Arguments
1692 ///
1693 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1694 ///
1695 /// * `since` - Pagination token from a previous request.
1696 ///
1697 /// * `server` - The name of the server, if `None` the requested server is
1698 /// used.
1699 ///
1700 /// # Examples
1701 /// ```no_run
1702 /// use matrix_sdk::Client;
1703 /// # use url::Url;
1704 /// # let homeserver = Url::parse("http://example.com").unwrap();
1705 /// # let limit = Some(10);
1706 /// # let since = Some("since token");
1707 /// # let server = Some("servername.com".try_into().unwrap());
1708 /// # async {
1709 /// let mut client = Client::new(homeserver).await.unwrap();
1710 ///
1711 /// client.public_rooms(limit, since, server).await;
1712 /// # };
1713 /// ```
1714 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1715 pub async fn public_rooms(
1716 &self,
1717 limit: Option<u32>,
1718 since: Option<&str>,
1719 server: Option<&ServerName>,
1720 ) -> HttpResult<get_public_rooms::v3::Response> {
1721 let limit = limit.map(UInt::from);
1722
1723 let request = assign!(get_public_rooms::v3::Request::new(), {
1724 limit,
1725 since: since.map(ToOwned::to_owned),
1726 server: server.map(ToOwned::to_owned),
1727 });
1728 self.send(request).await
1729 }
1730
1731 /// Create a room with the given parameters.
1732 ///
1733 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1734 /// created room.
1735 ///
1736 /// If you want to create a direct message with one specific user, you can
1737 /// use [`create_dm`][Self::create_dm], which is more convenient than
1738 /// assembling the [`create_room::v3::Request`] yourself.
1739 ///
1740 /// If the `is_direct` field of the request is set to `true` and at least
1741 /// one user is invited, the room will be automatically added to the direct
1742 /// rooms in the account data.
1743 ///
1744 /// # Examples
1745 ///
1746 /// ```no_run
1747 /// use matrix_sdk::{
1748 /// Client,
1749 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1750 /// };
1751 /// # use url::Url;
1752 /// #
1753 /// # async {
1754 /// # let homeserver = Url::parse("http://example.com").unwrap();
1755 /// let request = CreateRoomRequest::new();
1756 /// let client = Client::new(homeserver).await.unwrap();
1757 /// assert!(client.create_room(request).await.is_ok());
1758 /// # };
1759 /// ```
1760 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1761 let invite = request.invite.clone();
1762 let is_direct_room = request.is_direct;
1763 let response = self.send(request).await?;
1764
1765 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1766
1767 let joined_room = Room::new(self.clone(), base_room);
1768
1769 if is_direct_room
1770 && !invite.is_empty()
1771 && let Err(error) =
1772 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1773 {
1774 // FIXME: Retry in the background
1775 error!("Failed to mark room as DM: {error}");
1776 }
1777
1778 Ok(joined_room)
1779 }
1780
1781 /// Create a DM room.
1782 ///
1783 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1784 /// given user being invited, the room marked `is_direct` and both the
1785 /// creator and invitee getting the default maximum power level.
1786 ///
1787 /// If the `e2e-encryption` feature is enabled, the room will also be
1788 /// encrypted.
1789 ///
1790 /// # Arguments
1791 ///
1792 /// * `user_id` - The ID of the user to create a DM for.
1793 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1794 #[cfg(feature = "e2e-encryption")]
1795 let initial_state = vec![
1796 InitialStateEvent::with_empty_state_key(
1797 RoomEncryptionEventContent::with_recommended_defaults(),
1798 )
1799 .to_raw_any(),
1800 ];
1801
1802 #[cfg(not(feature = "e2e-encryption"))]
1803 let initial_state = vec![];
1804
1805 let request = assign!(create_room::v3::Request::new(), {
1806 invite: vec![user_id.to_owned()],
1807 is_direct: true,
1808 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1809 initial_state,
1810 });
1811
1812 self.create_room(request).await
1813 }
1814
1815 /// Get the existing DM room with the given user, if any.
1816 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1817 let rooms = self.joined_rooms();
1818
1819 // Find the room we share with the `user_id` and only with `user_id`
1820 let room = rooms.into_iter().find(|r| {
1821 let targets = r.direct_targets();
1822 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1823 });
1824
1825 trace!(?user_id, ?room, "Found DM room with user");
1826 room
1827 }
1828
1829 /// Search the homeserver's directory for public rooms with a filter.
1830 ///
1831 /// # Arguments
1832 ///
1833 /// * `room_search` - The easiest way to create this request is using the
1834 /// `get_public_rooms_filtered::Request` itself.
1835 ///
1836 /// # Examples
1837 ///
1838 /// ```no_run
1839 /// # use url::Url;
1840 /// # use matrix_sdk::Client;
1841 /// # async {
1842 /// # let homeserver = Url::parse("http://example.com")?;
1843 /// use matrix_sdk::ruma::{
1844 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1845 /// };
1846 /// # let mut client = Client::new(homeserver).await?;
1847 ///
1848 /// let mut filter = Filter::new();
1849 /// filter.generic_search_term = Some("rust".to_owned());
1850 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1851 /// request.filter = filter;
1852 ///
1853 /// let response = client.public_rooms_filtered(request).await?;
1854 ///
1855 /// for room in response.chunk {
1856 /// println!("Found room {room:?}");
1857 /// }
1858 /// # anyhow::Ok(()) };
1859 /// ```
1860 pub async fn public_rooms_filtered(
1861 &self,
1862 request: get_public_rooms_filtered::v3::Request,
1863 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1864 self.send(request).await
1865 }
1866
1867 /// Send an arbitrary request to the server, without updating client state.
1868 ///
1869 /// **Warning:** Because this method *does not* update the client state, it
1870 /// is important to make sure that you account for this yourself, and
1871 /// use wrapper methods where available. This method should *only* be
1872 /// used if a wrapper method for the endpoint you'd like to use is not
1873 /// available.
1874 ///
1875 /// # Arguments
1876 ///
1877 /// * `request` - A filled out and valid request for the endpoint to be hit
1878 ///
1879 /// * `timeout` - An optional request timeout setting, this overrides the
1880 /// default request setting if one was set.
1881 ///
1882 /// # Examples
1883 ///
1884 /// ```no_run
1885 /// # use matrix_sdk::{Client, config::SyncSettings};
1886 /// # use url::Url;
1887 /// # async {
1888 /// # let homeserver = Url::parse("http://localhost:8080")?;
1889 /// # let mut client = Client::new(homeserver).await?;
1890 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1891 ///
1892 /// // First construct the request you want to make
1893 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1894 /// // for all available Endpoints
1895 /// let user_id = user_id!("@example:localhost").to_owned();
1896 /// let request = profile::get_profile::v3::Request::new(user_id);
1897 ///
1898 /// // Start the request using Client::send()
1899 /// let response = client.send(request).await?;
1900 ///
1901 /// // Check the corresponding Response struct to find out what types are
1902 /// // returned
1903 /// # anyhow::Ok(()) };
1904 /// ```
1905 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1906 where
1907 Request: OutgoingRequest + Clone + Debug,
1908 for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1909 Request::PathBuilder: SupportedPathBuilder,
1910 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1911 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1912 {
1913 SendRequest {
1914 client: self.clone(),
1915 request,
1916 config: None,
1917 send_progress: Default::default(),
1918 }
1919 }
1920
1921 pub(crate) async fn send_inner<Request>(
1922 &self,
1923 request: Request,
1924 config: Option<RequestConfig>,
1925 send_progress: SharedObservable<TransmissionProgress>,
1926 ) -> HttpResult<Request::IncomingResponse>
1927 where
1928 Request: OutgoingRequest + Debug,
1929 for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1930 Request::PathBuilder: SupportedPathBuilder,
1931 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1932 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1933 {
1934 let homeserver = self.homeserver().to_string();
1935 let access_token = self.access_token();
1936 let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1937
1938 let path_builder_input =
1939 Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1940
1941 let result = self
1942 .inner
1943 .http_client
1944 .send(
1945 request,
1946 config,
1947 homeserver,
1948 access_token.as_deref(),
1949 path_builder_input,
1950 send_progress,
1951 )
1952 .await;
1953
1954 if let Err(Some(ErrorKind::UnknownToken { .. })) =
1955 result.as_ref().map_err(HttpError::client_api_error_kind)
1956 && let Some(access_token) = &access_token
1957 {
1958 // Mark the access token as expired.
1959 self.auth_ctx().set_access_token_expired(access_token);
1960 }
1961
1962 result
1963 }
1964
1965 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1966 _ = self
1967 .inner
1968 .auth_ctx
1969 .session_change_sender
1970 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1971 }
1972
1973 /// Fetches server versions from network; no caching.
1974 pub async fn fetch_server_versions(
1975 &self,
1976 request_config: Option<RequestConfig>,
1977 ) -> HttpResult<get_supported_versions::Response> {
1978 // Since this was called by the user, try to refresh the access token if
1979 // necessary.
1980 self.fetch_server_versions_inner(false, request_config).await
1981 }
1982
1983 /// Fetches server versions from network; no caching.
1984 ///
1985 /// If the access token is expired and `failsafe` is `false`, this will
1986 /// attempt to refresh the access token, otherwise this will try to make an
1987 /// unauthenticated request instead.
1988 pub(crate) async fn fetch_server_versions_inner(
1989 &self,
1990 failsafe: bool,
1991 request_config: Option<RequestConfig>,
1992 ) -> HttpResult<get_supported_versions::Response> {
1993 if !failsafe {
1994 // `Client::send()` handles refreshing access tokens.
1995 return self
1996 .send(get_supported_versions::Request::new())
1997 .with_request_config(request_config)
1998 .await;
1999 }
2000
2001 let homeserver = self.homeserver().to_string();
2002
2003 // If we have a fresh access token, try with it first.
2004 if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2005 && self.auth_ctx().has_valid_access_token()
2006 && let Some(access_token) = self.access_token()
2007 {
2008 let result = self
2009 .inner
2010 .http_client
2011 .send(
2012 get_supported_versions::Request::new(),
2013 request_config,
2014 homeserver.clone(),
2015 Some(&access_token),
2016 (),
2017 Default::default(),
2018 )
2019 .await;
2020
2021 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2022 result.as_ref().map_err(HttpError::client_api_error_kind)
2023 {
2024 // If the access token is actually expired, mark it as expired and fallback to
2025 // the unauthenticated request below.
2026 self.auth_ctx().set_access_token_expired(&access_token);
2027 } else {
2028 // If the request succeeded or it's an other error, just stop now.
2029 return result;
2030 }
2031 }
2032
2033 // Try without authentication.
2034 self.inner
2035 .http_client
2036 .send(
2037 get_supported_versions::Request::new(),
2038 request_config,
2039 homeserver.clone(),
2040 None,
2041 (),
2042 Default::default(),
2043 )
2044 .await
2045 }
2046
2047 /// Fetches client well_known from network; no caching.
2048 ///
2049 /// 1. If the [`Client::server`] value is available, we use it to fetch the
2050 /// well-known contents.
2051 /// 2. If it's not, we try extracting the server name from the
2052 /// [`Client::user_id`] and building the server URL from it.
2053 /// 3. If we couldn't get the well-known contents with either the explicit
2054 /// server name or the implicit extracted one, we try the homeserver URL
2055 /// as a last resort.
2056 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2057 let homeserver = self.homeserver();
2058 let scheme = homeserver.scheme();
2059
2060 // Use the server name, either an explicit one or an implicit one taken from
2061 // the user id: sometimes we'll have only the homeserver url available and no
2062 // server name, but the server name can be extracted from the current user id.
2063 let server_url = self
2064 .server()
2065 .map(|server| server.to_string())
2066 // If the server name wasn't available, extract it from the user id and build a URL:
2067 // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2068 // will be the same for the public server url, lacking a better candidate.
2069 .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2070
2071 // If the server name is available, first try using it
2072 let response = if let Some(server_url) = server_url {
2073 // First try using the server name
2074 self.fetch_client_well_known_with_url(server_url).await
2075 } else {
2076 None
2077 };
2078
2079 // If we didn't get a well-known value yet, try with the homeserver url instead:
2080 if response.is_none() {
2081 // Sometimes people configure their well-known directly on the homeserver so use
2082 // this as a fallback when the server name is unknown.
2083 warn!(
2084 "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2085 );
2086 self.fetch_client_well_known_with_url(homeserver.to_string()).await
2087 } else {
2088 response
2089 }
2090 }
2091
2092 async fn fetch_client_well_known_with_url(
2093 &self,
2094 url: String,
2095 ) -> Option<discover_homeserver::Response> {
2096 let well_known = self
2097 .inner
2098 .http_client
2099 .send(
2100 discover_homeserver::Request::new(),
2101 Some(RequestConfig::short_retry()),
2102 url,
2103 None,
2104 (),
2105 Default::default(),
2106 )
2107 .await;
2108
2109 match well_known {
2110 Ok(well_known) => Some(well_known),
2111 Err(http_error) => {
2112 // It is perfectly valid to not have a well-known file.
2113 // Maybe we should check for a specific error code to be sure?
2114 warn!("Failed to fetch client well-known: {http_error}");
2115 None
2116 }
2117 }
2118 }
2119
2120 /// Load supported versions from storage, or fetch them from network and
2121 /// cache them.
2122 ///
2123 /// If `failsafe` is true, this will try to minimize side effects to avoid
2124 /// possible deadlocks.
2125 async fn load_or_fetch_supported_versions(
2126 &self,
2127 failsafe: bool,
2128 ) -> HttpResult<SupportedVersionsResponse> {
2129 match self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await {
2130 Ok(Some(stored)) => {
2131 if let Some(supported_versions) =
2132 stored.into_supported_versions().and_then(|value| value.into_data())
2133 {
2134 return Ok(supported_versions);
2135 }
2136 }
2137 Ok(None) => {
2138 // fallthrough: cache is empty
2139 }
2140 Err(err) => {
2141 warn!("error when loading cached supported versions: {err}");
2142 // fallthrough to network.
2143 }
2144 }
2145
2146 let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2147 let supported_versions = SupportedVersionsResponse {
2148 versions: server_versions.versions,
2149 unstable_features: server_versions.unstable_features,
2150 };
2151
2152 // Only attempt to cache the result in storage if the request was authenticated.
2153 if self.auth_ctx().has_valid_access_token()
2154 && let Err(err) = self
2155 .state_store()
2156 .set_kv_data(
2157 StateStoreDataKey::SupportedVersions,
2158 StateStoreDataValue::SupportedVersions(TtlStoreValue::new(
2159 supported_versions.clone(),
2160 )),
2161 )
2162 .await
2163 {
2164 warn!("error when caching supported versions: {err}");
2165 }
2166
2167 Ok(supported_versions)
2168 }
2169
2170 pub(crate) async fn get_cached_supported_versions(&self) -> Option<SupportedVersions> {
2171 let supported_versions = &self.inner.caches.supported_versions;
2172
2173 if let CachedValue::Cached(val) = &*supported_versions.read().await {
2174 Some(val.clone())
2175 } else {
2176 None
2177 }
2178 }
2179
2180 /// Get the Matrix versions and features supported by the homeserver by
2181 /// fetching them from the server or the cache.
2182 ///
2183 /// This is equivalent to calling both [`Client::server_versions()`] and
2184 /// [`Client::unstable_features()`]. To always fetch the result from the
2185 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2186 /// and then `.as_supported_versions()` on the response.
2187 ///
2188 /// # Examples
2189 ///
2190 /// ```no_run
2191 /// use ruma::api::{FeatureFlag, MatrixVersion};
2192 /// # use matrix_sdk::{Client, config::SyncSettings};
2193 /// # use url::Url;
2194 /// # async {
2195 /// # let homeserver = Url::parse("http://localhost:8080")?;
2196 /// # let mut client = Client::new(homeserver).await?;
2197 ///
2198 /// let supported = client.supported_versions().await?;
2199 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2200 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2201 ///
2202 /// let msc_x_feature = FeatureFlag::from("msc_x");
2203 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2204 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2205 /// # anyhow::Ok(()) };
2206 /// ```
2207 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2208 self.supported_versions_inner(false).await
2209 }
2210
2211 /// Get the Matrix versions and features supported by the homeserver by
2212 /// fetching them from the server or the cache.
2213 ///
2214 /// If `failsafe` is true, this will try to minimize side effects to avoid
2215 /// possible deadlocks.
2216 pub(crate) async fn supported_versions_inner(
2217 &self,
2218 failsafe: bool,
2219 ) -> HttpResult<SupportedVersions> {
2220 let cached_supported_versions = &self.inner.caches.supported_versions;
2221 if let CachedValue::Cached(val) = &*cached_supported_versions.read().await {
2222 return Ok(val.clone());
2223 }
2224
2225 let mut guarded_supported_versions = cached_supported_versions.write().await;
2226 if let CachedValue::Cached(val) = &*guarded_supported_versions {
2227 return Ok(val.clone());
2228 }
2229
2230 let supported = self.load_or_fetch_supported_versions(failsafe).await?;
2231
2232 // Fill both unstable features and server versions at once.
2233 let mut supported_versions = supported.supported_versions();
2234 if supported_versions.versions.is_empty() {
2235 supported_versions.versions = [MatrixVersion::V1_0].into();
2236 }
2237
2238 // Only cache the result if the request was authenticated.
2239 if self.auth_ctx().has_valid_access_token() {
2240 *guarded_supported_versions = CachedValue::Cached(supported_versions.clone());
2241 }
2242
2243 Ok(supported_versions)
2244 }
2245
2246 /// Get the Matrix versions and features supported by the homeserver by
2247 /// fetching them from the cache.
2248 ///
2249 /// For a version of this function that fetches the supported versions and
2250 /// features from the homeserver if the [`SupportedVersions`] aren't
2251 /// found in the cache, take a look at the [`Client::server_versions()`]
2252 /// method.
2253 ///
2254 /// # Examples
2255 ///
2256 /// ```no_run
2257 /// use ruma::api::{FeatureFlag, MatrixVersion};
2258 /// # use matrix_sdk::{Client, config::SyncSettings};
2259 /// # use url::Url;
2260 /// # async {
2261 /// # let homeserver = Url::parse("http://localhost:8080")?;
2262 /// # let mut client = Client::new(homeserver).await?;
2263 ///
2264 /// let supported =
2265 /// if let Some(supported) = client.supported_versions_cached().await? {
2266 /// supported
2267 /// } else {
2268 /// client.fetch_server_versions(None).await?.as_supported_versions()
2269 /// };
2270 ///
2271 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2272 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2273 ///
2274 /// let msc_x_feature = FeatureFlag::from("msc_x");
2275 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2276 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2277 /// # anyhow::Ok(()) };
2278 /// ```
2279 pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2280 if let Some(cached) = self.get_cached_supported_versions().await {
2281 Ok(Some(cached))
2282 } else if let Some(stored) =
2283 self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await?
2284 {
2285 if let Some(supported) =
2286 stored.into_supported_versions().and_then(|value| value.into_data())
2287 {
2288 Ok(Some(supported.supported_versions()))
2289 } else {
2290 Ok(None)
2291 }
2292 } else {
2293 Ok(None)
2294 }
2295 }
2296
2297 /// Get the Matrix versions supported by the homeserver by fetching them
2298 /// from the server or the cache.
2299 ///
2300 /// # Examples
2301 ///
2302 /// ```no_run
2303 /// use ruma::api::MatrixVersion;
2304 /// # use matrix_sdk::{Client, config::SyncSettings};
2305 /// # use url::Url;
2306 /// # async {
2307 /// # let homeserver = Url::parse("http://localhost:8080")?;
2308 /// # let mut client = Client::new(homeserver).await?;
2309 ///
2310 /// let server_versions = client.server_versions().await?;
2311 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2312 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2313 /// # anyhow::Ok(()) };
2314 /// ```
2315 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2316 Ok(self.supported_versions().await?.versions)
2317 }
2318
2319 /// Get the unstable features supported by the homeserver by fetching them
2320 /// from the server or the cache.
2321 ///
2322 /// # Examples
2323 ///
2324 /// ```no_run
2325 /// use matrix_sdk::ruma::api::FeatureFlag;
2326 /// # use matrix_sdk::{Client, config::SyncSettings};
2327 /// # use url::Url;
2328 /// # async {
2329 /// # let homeserver = Url::parse("http://localhost:8080")?;
2330 /// # let mut client = Client::new(homeserver).await?;
2331 ///
2332 /// let msc_x_feature = FeatureFlag::from("msc_x");
2333 /// let unstable_features = client.unstable_features().await?;
2334 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2335 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2336 /// # anyhow::Ok(()) };
2337 /// ```
2338 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2339 Ok(self.supported_versions().await?.features)
2340 }
2341
2342 /// Empty the supported versions and unstable features cache.
2343 ///
2344 /// Since the SDK caches the supported versions, it's possible to have a
2345 /// stale entry in the cache. This functions makes it possible to force
2346 /// reset it.
2347 pub async fn reset_supported_versions(&self) -> Result<()> {
2348 // Empty the in-memory caches.
2349 self.inner.caches.supported_versions.write().await.take();
2350
2351 // Empty the store cache.
2352 Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2353 }
2354
2355 /// Load well-known from storage, or fetch it from network and cache it.
2356 async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2357 match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
2358 Ok(Some(stored)) => {
2359 if let Some(well_known) =
2360 stored.into_well_known().and_then(|value| value.into_data())
2361 {
2362 return Ok(well_known);
2363 }
2364 }
2365 Ok(None) => {
2366 // fallthrough: cache is empty
2367 }
2368 Err(err) => {
2369 warn!("error when loading cached well-known: {err}");
2370 // fallthrough to network.
2371 }
2372 }
2373
2374 let well_known = self.fetch_client_well_known().await.map(Into::into);
2375
2376 // Attempt to cache the result in storage.
2377 if let Err(err) = self
2378 .state_store()
2379 .set_kv_data(
2380 StateStoreDataKey::WellKnown,
2381 StateStoreDataValue::WellKnown(TtlStoreValue::new(well_known.clone())),
2382 )
2383 .await
2384 {
2385 warn!("error when caching well-known: {err}");
2386 }
2387
2388 Ok(well_known)
2389 }
2390
2391 async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2392 let well_known = &self.inner.caches.well_known;
2393 if let CachedValue::Cached(val) = &*well_known.read().await {
2394 return Ok(val.clone());
2395 }
2396
2397 let mut guarded_well_known = well_known.write().await;
2398 if let CachedValue::Cached(val) = &*guarded_well_known {
2399 return Ok(val.clone());
2400 }
2401
2402 let well_known = self.load_or_fetch_well_known().await?;
2403
2404 *guarded_well_known = CachedValue::Cached(well_known.clone());
2405
2406 Ok(well_known)
2407 }
2408
2409 /// Get information about the homeserver's advertised RTC foci by fetching
2410 /// the well-known file from the server or the cache.
2411 ///
2412 /// # Examples
2413 /// ```no_run
2414 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2415 /// # use url::Url;
2416 /// # async {
2417 /// # let homeserver = Url::parse("http://localhost:8080")?;
2418 /// # let mut client = Client::new(homeserver).await?;
2419 /// let rtc_foci = client.rtc_foci().await?;
2420 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2421 /// RtcFocusInfo::LiveKit(info) => Some(info),
2422 /// _ => None,
2423 /// });
2424 /// if let Some(info) = default_livekit_focus_info {
2425 /// println!("Default LiveKit service URL: {}", info.service_url);
2426 /// }
2427 /// # anyhow::Ok(()) };
2428 /// ```
2429 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2430 let well_known = self.get_or_load_and_cache_well_known().await?;
2431
2432 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2433 }
2434
2435 /// Empty the well-known cache.
2436 ///
2437 /// Since the SDK caches the well-known, it's possible to have a stale entry
2438 /// in the cache. This functions makes it possible to force reset it.
2439 pub async fn reset_well_known(&self) -> Result<()> {
2440 // Empty the in-memory caches.
2441 self.inner.caches.well_known.write().await.take();
2442
2443 // Empty the store cache.
2444 Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2445 }
2446
2447 /// Check whether MSC 4028 is enabled on the homeserver.
2448 ///
2449 /// # Examples
2450 ///
2451 /// ```no_run
2452 /// # use matrix_sdk::{Client, config::SyncSettings};
2453 /// # use url::Url;
2454 /// # async {
2455 /// # let homeserver = Url::parse("http://localhost:8080")?;
2456 /// # let mut client = Client::new(homeserver).await?;
2457 /// let msc4028_enabled =
2458 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2459 /// # anyhow::Ok(()) };
2460 /// ```
2461 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2462 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2463 }
2464
2465 /// Get information of all our own devices.
2466 ///
2467 /// # Examples
2468 ///
2469 /// ```no_run
2470 /// # use matrix_sdk::{Client, config::SyncSettings};
2471 /// # use url::Url;
2472 /// # async {
2473 /// # let homeserver = Url::parse("http://localhost:8080")?;
2474 /// # let mut client = Client::new(homeserver).await?;
2475 /// let response = client.devices().await?;
2476 ///
2477 /// for device in response.devices {
2478 /// println!(
2479 /// "Device: {} {}",
2480 /// device.device_id,
2481 /// device.display_name.as_deref().unwrap_or("")
2482 /// );
2483 /// }
2484 /// # anyhow::Ok(()) };
2485 /// ```
2486 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2487 let request = get_devices::v3::Request::new();
2488
2489 self.send(request).await
2490 }
2491
2492 /// Delete the given devices from the server.
2493 ///
2494 /// # Arguments
2495 ///
2496 /// * `devices` - The list of devices that should be deleted from the
2497 /// server.
2498 ///
2499 /// * `auth_data` - This request requires user interactive auth, the first
2500 /// request needs to set this to `None` and will always fail with an
2501 /// `UiaaResponse`. The response will contain information for the
2502 /// interactive auth and the same request needs to be made but this time
2503 /// with some `auth_data` provided.
2504 ///
2505 /// ```no_run
2506 /// # use matrix_sdk::{
2507 /// # ruma::{api::client::uiaa, device_id},
2508 /// # Client, Error, config::SyncSettings,
2509 /// # };
2510 /// # use serde_json::json;
2511 /// # use url::Url;
2512 /// # use std::collections::BTreeMap;
2513 /// # async {
2514 /// # let homeserver = Url::parse("http://localhost:8080")?;
2515 /// # let mut client = Client::new(homeserver).await?;
2516 /// let devices = &[device_id!("DEVICEID").to_owned()];
2517 ///
2518 /// if let Err(e) = client.delete_devices(devices, None).await {
2519 /// if let Some(info) = e.as_uiaa_response() {
2520 /// let mut password = uiaa::Password::new(
2521 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2522 /// "wordpass".to_owned(),
2523 /// );
2524 /// password.session = info.session.clone();
2525 ///
2526 /// client
2527 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2528 /// .await?;
2529 /// }
2530 /// }
2531 /// # anyhow::Ok(()) };
2532 pub async fn delete_devices(
2533 &self,
2534 devices: &[OwnedDeviceId],
2535 auth_data: Option<uiaa::AuthData>,
2536 ) -> HttpResult<delete_devices::v3::Response> {
2537 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2538 request.auth = auth_data;
2539
2540 self.send(request).await
2541 }
2542
2543 /// Change the display name of a device owned by the current user.
2544 ///
2545 /// Returns a `update_device::Response` which specifies the result
2546 /// of the operation.
2547 ///
2548 /// # Arguments
2549 ///
2550 /// * `device_id` - The ID of the device to change the display name of.
2551 /// * `display_name` - The new display name to set.
2552 pub async fn rename_device(
2553 &self,
2554 device_id: &DeviceId,
2555 display_name: &str,
2556 ) -> HttpResult<update_device::v3::Response> {
2557 let mut request = update_device::v3::Request::new(device_id.to_owned());
2558 request.display_name = Some(display_name.to_owned());
2559
2560 self.send(request).await
2561 }
2562
2563 /// Check whether a device with a specific ID exists on the server.
2564 ///
2565 /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2566 /// with 404 and the underlying error otherwise.
2567 ///
2568 /// # Arguments
2569 ///
2570 /// * `device_id` - The ID of the device to query.
2571 pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2572 let request = device::get_device::v3::Request::new(device_id);
2573 match self.send(request).await {
2574 Ok(_) => Ok(true),
2575 Err(err) => {
2576 if let Some(error) = err.as_client_api_error()
2577 && error.status_code == 404
2578 {
2579 Ok(false)
2580 } else {
2581 Err(err.into())
2582 }
2583 }
2584 }
2585 }
2586
2587 /// Synchronize the client's state with the latest state on the server.
2588 ///
2589 /// ## Syncing Events
2590 ///
2591 /// Messages or any other type of event need to be periodically fetched from
2592 /// the server, this is achieved by sending a `/sync` request to the server.
2593 ///
2594 /// The first sync is sent out without a [`token`]. The response of the
2595 /// first sync will contain a [`next_batch`] field which should then be
2596 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2597 /// don't receive the same events multiple times.
2598 ///
2599 /// ## Long Polling
2600 ///
2601 /// A sync should in the usual case always be in flight. The
2602 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2603 /// long the server will wait for new events before it will respond.
2604 /// The server will respond immediately if some new events arrive before the
2605 /// timeout has expired. If no changes arrive and the timeout expires an
2606 /// empty sync response will be sent to the client.
2607 ///
2608 /// This method of sending a request that may not receive a response
2609 /// immediately is called long polling.
2610 ///
2611 /// ## Filtering Events
2612 ///
2613 /// The number or type of messages and events that the client should receive
2614 /// from the server can be altered using a [`Filter`].
2615 ///
2616 /// Filters can be non-trivial and, since they will be sent with every sync
2617 /// request, they may take up a bunch of unnecessary bandwidth.
2618 ///
2619 /// Luckily filters can be uploaded to the server and reused using an unique
2620 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2621 /// method.
2622 ///
2623 /// # Arguments
2624 ///
2625 /// * `sync_settings` - Settings for the sync call, this allows us to set
2626 /// various options to configure the sync:
2627 /// * [`filter`] - To configure which events we receive and which get
2628 /// [filtered] by the server
2629 /// * [`timeout`] - To configure our [long polling] setup.
2630 /// * [`token`] - To tell the server which events we already received
2631 /// and where we wish to continue syncing.
2632 /// * [`full_state`] - To tell the server that we wish to receive all
2633 /// state events, regardless of our configured [`token`].
2634 /// * [`set_presence`] - To tell the server to set the presence and to
2635 /// which state.
2636 ///
2637 /// # Examples
2638 ///
2639 /// ```no_run
2640 /// # use url::Url;
2641 /// # async {
2642 /// # let homeserver = Url::parse("http://localhost:8080")?;
2643 /// # let username = "";
2644 /// # let password = "";
2645 /// use matrix_sdk::{
2646 /// Client, config::SyncSettings,
2647 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2648 /// };
2649 ///
2650 /// let client = Client::new(homeserver).await?;
2651 /// client.matrix_auth().login_username(username, password).send().await?;
2652 ///
2653 /// // Sync once so we receive the client state and old messages.
2654 /// client.sync_once(SyncSettings::default()).await?;
2655 ///
2656 /// // Register our handler so we start responding once we receive a new
2657 /// // event.
2658 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2659 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2660 /// });
2661 ///
2662 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2663 /// // from our `sync_once()` call automatically.
2664 /// client.sync(SyncSettings::default()).await;
2665 /// # anyhow::Ok(()) };
2666 /// ```
2667 ///
2668 /// [`sync`]: #method.sync
2669 /// [`SyncSettings`]: crate::config::SyncSettings
2670 /// [`token`]: crate::config::SyncSettings#method.token
2671 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2672 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2673 /// [`set_presence`]: ruma::presence::PresenceState
2674 /// [`filter`]: crate::config::SyncSettings#method.filter
2675 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2676 /// [`next_batch`]: SyncResponse#structfield.next_batch
2677 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2678 /// [long polling]: #long-polling
2679 /// [filtered]: #filtering-events
2680 #[instrument(skip(self))]
2681 pub async fn sync_once(
2682 &self,
2683 sync_settings: crate::config::SyncSettings,
2684 ) -> Result<SyncResponse> {
2685 // The sync might not return for quite a while due to the timeout.
2686 // We'll see if there's anything crypto related to send out before we
2687 // sync, i.e. if we closed our client after a sync but before the
2688 // crypto requests were sent out.
2689 //
2690 // This will mostly be a no-op.
2691 #[cfg(feature = "e2e-encryption")]
2692 if let Err(e) = self.send_outgoing_requests().await {
2693 error!(error = ?e, "Error while sending outgoing E2EE requests");
2694 }
2695
2696 let token = match sync_settings.token {
2697 SyncToken::Specific(token) => Some(token),
2698 SyncToken::NoToken => None,
2699 SyncToken::ReusePrevious => self.sync_token().await,
2700 };
2701
2702 let request = assign!(sync_events::v3::Request::new(), {
2703 filter: sync_settings.filter.map(|f| *f),
2704 since: token,
2705 full_state: sync_settings.full_state,
2706 set_presence: sync_settings.set_presence,
2707 timeout: sync_settings.timeout,
2708 use_state_after: true,
2709 });
2710 let mut request_config = self.request_config();
2711 if let Some(timeout) = sync_settings.timeout {
2712 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2713 request_config.timeout = Some(base_timeout + timeout);
2714 }
2715
2716 let response = self.send(request).with_request_config(request_config).await?;
2717 let next_batch = response.next_batch.clone();
2718 let response = self.process_sync(response).await?;
2719
2720 #[cfg(feature = "e2e-encryption")]
2721 if let Err(e) = self.send_outgoing_requests().await {
2722 error!(error = ?e, "Error while sending outgoing E2EE requests");
2723 }
2724
2725 self.inner.sync_beat.notify(usize::MAX);
2726
2727 Ok(SyncResponse::new(next_batch, response))
2728 }
2729
2730 /// Repeatedly synchronize the client state with the server.
2731 ///
2732 /// This method will only return on error, if cancellation is needed
2733 /// the method should be wrapped in a cancelable task or the
2734 /// [`Client::sync_with_callback`] method can be used or
2735 /// [`Client::sync_with_result_callback`] if you want to handle error
2736 /// cases in the loop, too.
2737 ///
2738 /// This method will internally call [`Client::sync_once`] in a loop.
2739 ///
2740 /// This method can be used with the [`Client::add_event_handler`]
2741 /// method to react to individual events. If you instead wish to handle
2742 /// events in a bulk manner the [`Client::sync_with_callback`],
2743 /// [`Client::sync_with_result_callback`] and
2744 /// [`Client::sync_stream`] methods can be used instead. Those methods
2745 /// repeatedly return the whole sync response.
2746 ///
2747 /// # Arguments
2748 ///
2749 /// * `sync_settings` - Settings for the sync call. *Note* that those
2750 /// settings will be only used for the first sync call. See the argument
2751 /// docs for [`Client::sync_once`] for more info.
2752 ///
2753 /// # Return
2754 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2755 /// up to the user of the API to check the error and decide whether the sync
2756 /// should continue or not.
2757 ///
2758 /// # Examples
2759 ///
2760 /// ```no_run
2761 /// # use url::Url;
2762 /// # async {
2763 /// # let homeserver = Url::parse("http://localhost:8080")?;
2764 /// # let username = "";
2765 /// # let password = "";
2766 /// use matrix_sdk::{
2767 /// Client, config::SyncSettings,
2768 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2769 /// };
2770 ///
2771 /// let client = Client::new(homeserver).await?;
2772 /// client.matrix_auth().login_username(&username, &password).send().await?;
2773 ///
2774 /// // Register our handler so we start responding once we receive a new
2775 /// // event.
2776 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2777 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2778 /// });
2779 ///
2780 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2781 /// // automatically.
2782 /// client.sync(SyncSettings::default()).await?;
2783 /// # anyhow::Ok(()) };
2784 /// ```
2785 ///
2786 /// [argument docs]: #method.sync_once
2787 /// [`sync_with_callback`]: #method.sync_with_callback
2788 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2789 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2790 }
2791
2792 /// Repeatedly call sync to synchronize the client state with the server.
2793 ///
2794 /// # Arguments
2795 ///
2796 /// * `sync_settings` - Settings for the sync call. *Note* that those
2797 /// settings will be only used for the first sync call. See the argument
2798 /// docs for [`Client::sync_once`] for more info.
2799 ///
2800 /// * `callback` - A callback that will be called every time a successful
2801 /// response has been fetched from the server. The callback must return a
2802 /// boolean which signalizes if the method should stop syncing. If the
2803 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2804 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2805 ///
2806 /// # Return
2807 /// The sync runs until an error occurs or the
2808 /// callback indicates that the Loop should stop. If the callback asked for
2809 /// a regular stop, the result will be `Ok(())` otherwise the
2810 /// `Err(Error)` is returned.
2811 ///
2812 /// # Examples
2813 ///
2814 /// The following example demonstrates how to sync forever while sending all
2815 /// the interesting events through a mpsc channel to another thread e.g. a
2816 /// UI thread.
2817 ///
2818 /// ```no_run
2819 /// # use std::time::Duration;
2820 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2821 /// # use url::Url;
2822 /// # async {
2823 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2824 /// # let mut client = Client::new(homeserver).await.unwrap();
2825 ///
2826 /// use tokio::sync::mpsc::channel;
2827 ///
2828 /// let (tx, rx) = channel(100);
2829 ///
2830 /// let sync_channel = &tx;
2831 /// let sync_settings = SyncSettings::new()
2832 /// .timeout(Duration::from_secs(30));
2833 ///
2834 /// client
2835 /// .sync_with_callback(sync_settings, |response| async move {
2836 /// let channel = sync_channel;
2837 /// for (room_id, room) in response.rooms.joined {
2838 /// for event in room.timeline.events {
2839 /// channel.send(event).await.unwrap();
2840 /// }
2841 /// }
2842 ///
2843 /// LoopCtrl::Continue
2844 /// })
2845 /// .await;
2846 /// };
2847 /// ```
2848 #[instrument(skip_all)]
2849 pub async fn sync_with_callback<C>(
2850 &self,
2851 sync_settings: crate::config::SyncSettings,
2852 callback: impl Fn(SyncResponse) -> C,
2853 ) -> Result<(), Error>
2854 where
2855 C: Future<Output = LoopCtrl>,
2856 {
2857 self.sync_with_result_callback(sync_settings, |result| async {
2858 Ok(callback(result?).await)
2859 })
2860 .await
2861 }
2862
2863 /// Repeatedly call sync to synchronize the client state with the server.
2864 ///
2865 /// # Arguments
2866 ///
2867 /// * `sync_settings` - Settings for the sync call. *Note* that those
2868 /// settings will be only used for the first sync call. See the argument
2869 /// docs for [`Client::sync_once`] for more info.
2870 ///
2871 /// * `callback` - A callback that will be called every time after a
2872 /// response has been received, failure or not. The callback returns a
2873 /// `Result<LoopCtrl, Error>`, too. When returning
2874 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2875 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2876 /// function returns `Ok(())`. In case the callback can't handle the
2877 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2878 /// which results in the sync ending and the `Err(Error)` being returned.
2879 ///
2880 /// # Return
2881 /// The sync runs until an error occurs that the callback can't handle or
2882 /// the callback indicates that the Loop should stop. If the callback
2883 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2884 /// `Err(Error)` is returned.
2885 ///
2886 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2887 /// this, and are handled first without sending the result to the
2888 /// callback. Only after they have exceeded is the `Result` handed to
2889 /// the callback.
2890 ///
2891 /// # Examples
2892 ///
2893 /// The following example demonstrates how to sync forever while sending all
2894 /// the interesting events through a mpsc channel to another thread e.g. a
2895 /// UI thread.
2896 ///
2897 /// ```no_run
2898 /// # use std::time::Duration;
2899 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2900 /// # use url::Url;
2901 /// # async {
2902 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2903 /// # let mut client = Client::new(homeserver).await.unwrap();
2904 /// #
2905 /// use tokio::sync::mpsc::channel;
2906 ///
2907 /// let (tx, rx) = channel(100);
2908 ///
2909 /// let sync_channel = &tx;
2910 /// let sync_settings = SyncSettings::new()
2911 /// .timeout(Duration::from_secs(30));
2912 ///
2913 /// client
2914 /// .sync_with_result_callback(sync_settings, |response| async move {
2915 /// let channel = sync_channel;
2916 /// let sync_response = response?;
2917 /// for (room_id, room) in sync_response.rooms.joined {
2918 /// for event in room.timeline.events {
2919 /// channel.send(event).await.unwrap();
2920 /// }
2921 /// }
2922 ///
2923 /// Ok(LoopCtrl::Continue)
2924 /// })
2925 /// .await;
2926 /// };
2927 /// ```
2928 #[instrument(skip(self, callback))]
2929 pub async fn sync_with_result_callback<C>(
2930 &self,
2931 sync_settings: crate::config::SyncSettings,
2932 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2933 ) -> Result<(), Error>
2934 where
2935 C: Future<Output = Result<LoopCtrl, Error>>,
2936 {
2937 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2938
2939 while let Some(result) = sync_stream.next().await {
2940 trace!("Running callback");
2941 if callback(result).await? == LoopCtrl::Break {
2942 trace!("Callback told us to stop");
2943 break;
2944 }
2945 trace!("Done running callback");
2946 }
2947
2948 Ok(())
2949 }
2950
2951 //// Repeatedly synchronize the client state with the server.
2952 ///
2953 /// This method will internally call [`Client::sync_once`] in a loop and is
2954 /// equivalent to the [`Client::sync`] method but the responses are provided
2955 /// as an async stream.
2956 ///
2957 /// # Arguments
2958 ///
2959 /// * `sync_settings` - Settings for the sync call. *Note* that those
2960 /// settings will be only used for the first sync call. See the argument
2961 /// docs for [`Client::sync_once`] for more info.
2962 ///
2963 /// # Examples
2964 ///
2965 /// ```no_run
2966 /// # use url::Url;
2967 /// # async {
2968 /// # let homeserver = Url::parse("http://localhost:8080")?;
2969 /// # let username = "";
2970 /// # let password = "";
2971 /// use futures_util::StreamExt;
2972 /// use matrix_sdk::{Client, config::SyncSettings};
2973 ///
2974 /// let client = Client::new(homeserver).await?;
2975 /// client.matrix_auth().login_username(&username, &password).send().await?;
2976 ///
2977 /// let mut sync_stream =
2978 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2979 ///
2980 /// while let Some(Ok(response)) = sync_stream.next().await {
2981 /// for room in response.rooms.joined.values() {
2982 /// for e in &room.timeline.events {
2983 /// if let Ok(event) = e.raw().deserialize() {
2984 /// println!("Received event {:?}", event);
2985 /// }
2986 /// }
2987 /// }
2988 /// }
2989 ///
2990 /// # anyhow::Ok(()) };
2991 /// ```
2992 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2993 #[instrument(skip(self))]
2994 pub async fn sync_stream(
2995 &self,
2996 mut sync_settings: crate::config::SyncSettings,
2997 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2998 let mut is_first_sync = true;
2999 let mut timeout = None;
3000 let mut last_sync_time: Option<Instant> = None;
3001
3002 let parent_span = Span::current();
3003
3004 async_stream::stream!({
3005 loop {
3006 trace!("Syncing");
3007
3008 if sync_settings.ignore_timeout_on_first_sync {
3009 if is_first_sync {
3010 timeout = sync_settings.timeout.take();
3011 } else if sync_settings.timeout.is_none() && timeout.is_some() {
3012 sync_settings.timeout = timeout.take();
3013 }
3014
3015 is_first_sync = false;
3016 }
3017
3018 yield self
3019 .sync_loop_helper(&mut sync_settings)
3020 .instrument(parent_span.clone())
3021 .await;
3022
3023 Client::delay_sync(&mut last_sync_time).await
3024 }
3025 })
3026 }
3027
3028 /// Get the current, if any, sync token of the client.
3029 /// This will be None if the client didn't sync at least once.
3030 pub(crate) async fn sync_token(&self) -> Option<String> {
3031 self.inner.base_client.sync_token().await
3032 }
3033
3034 /// Gets information about the owner of a given access token.
3035 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3036 let request = whoami::v3::Request::new();
3037 self.send(request).await
3038 }
3039
3040 /// Subscribes a new receiver to client SessionChange broadcasts.
3041 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3042 let broadcast = &self.auth_ctx().session_change_sender;
3043 broadcast.subscribe()
3044 }
3045
3046 /// Sets the save/restore session callbacks.
3047 ///
3048 /// This is another mechanism to get synchronous updates to session tokens,
3049 /// while [`Self::subscribe_to_session_changes`] provides an async update.
3050 pub fn set_session_callbacks(
3051 &self,
3052 reload_session_callback: Box<ReloadSessionCallback>,
3053 save_session_callback: Box<SaveSessionCallback>,
3054 ) -> Result<()> {
3055 self.inner
3056 .auth_ctx
3057 .reload_session_callback
3058 .set(reload_session_callback)
3059 .map_err(|_| Error::MultipleSessionCallbacks)?;
3060
3061 self.inner
3062 .auth_ctx
3063 .save_session_callback
3064 .set(save_session_callback)
3065 .map_err(|_| Error::MultipleSessionCallbacks)?;
3066
3067 Ok(())
3068 }
3069
3070 /// Get the notification settings of the current owner of the client.
3071 pub async fn notification_settings(&self) -> NotificationSettings {
3072 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3073 NotificationSettings::new(self.clone(), ruleset)
3074 }
3075
3076 /// Create a new specialized `Client` that can process notifications.
3077 ///
3078 /// See [`CrossProcessLock::new`] to learn more about
3079 /// `cross_process_store_locks_holder_name`.
3080 ///
3081 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3082 pub async fn notification_client(
3083 &self,
3084 cross_process_store_locks_holder_name: String,
3085 ) -> Result<Client> {
3086 let client = Client {
3087 inner: ClientInner::new(
3088 self.inner.auth_ctx.clone(),
3089 self.server().cloned(),
3090 self.homeserver(),
3091 self.sliding_sync_version(),
3092 self.inner.http_client.clone(),
3093 self.inner
3094 .base_client
3095 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
3096 .await?,
3097 self.inner.caches.supported_versions.read().await.clone(),
3098 self.inner.caches.well_known.read().await.clone(),
3099 self.inner.respect_login_well_known,
3100 self.inner.event_cache.clone(),
3101 self.inner.send_queue_data.clone(),
3102 self.inner.latest_events.clone(),
3103 #[cfg(feature = "e2e-encryption")]
3104 self.inner.e2ee.encryption_settings,
3105 #[cfg(feature = "e2e-encryption")]
3106 self.inner.enable_share_history_on_invite,
3107 cross_process_store_locks_holder_name,
3108 #[cfg(feature = "experimental-search")]
3109 self.inner.search_index.clone(),
3110 self.inner.thread_subscription_catchup.clone(),
3111 )
3112 .await,
3113 };
3114
3115 Ok(client)
3116 }
3117
3118 /// The [`EventCache`] instance for this [`Client`].
3119 pub fn event_cache(&self) -> &EventCache {
3120 // SAFETY: always initialized in the `Client` ctor.
3121 self.inner.event_cache.get().unwrap()
3122 }
3123
3124 /// The [`LatestEvents`] instance for this [`Client`].
3125 pub async fn latest_events(&self) -> &LatestEvents {
3126 self.inner
3127 .latest_events
3128 .get_or_init(|| async {
3129 LatestEvents::new(
3130 WeakClient::from_client(self),
3131 self.event_cache().clone(),
3132 SendQueue::new(self.clone()),
3133 )
3134 })
3135 .await
3136 }
3137
3138 /// Waits until an at least partially synced room is received, and returns
3139 /// it.
3140 ///
3141 /// **Note: this function will loop endlessly until either it finds the room
3142 /// or an externally set timeout happens.**
3143 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3144 loop {
3145 if let Some(room) = self.get_room(room_id) {
3146 if room.is_state_partially_or_fully_synced() {
3147 debug!("Found just created room!");
3148 return room;
3149 }
3150 debug!("Room wasn't partially synced, waiting for sync beat to try again");
3151 } else {
3152 debug!("Room wasn't found, waiting for sync beat to try again");
3153 }
3154 self.inner.sync_beat.listen().await;
3155 }
3156 }
3157
3158 /// Knock on a room given its `room_id_or_alias` to ask for permission to
3159 /// join it.
3160 pub async fn knock(
3161 &self,
3162 room_id_or_alias: OwnedRoomOrAliasId,
3163 reason: Option<String>,
3164 server_names: Vec<OwnedServerName>,
3165 ) -> Result<Room> {
3166 let request =
3167 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3168 let response = self.send(request).await?;
3169 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3170 Ok(Room::new(self.clone(), base_room))
3171 }
3172
3173 /// Checks whether the provided `user_id` belongs to an ignored user.
3174 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3175 self.base_client().is_user_ignored(user_id).await
3176 }
3177
3178 /// Gets the `max_upload_size` value from the homeserver, getting either a
3179 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3180 /// missing.
3181 ///
3182 /// Check the spec for more info:
3183 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3184 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3185 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3186 if let Some(data) = max_upload_size_lock.get() {
3187 return Ok(data.to_owned());
3188 }
3189
3190 // Use the authenticated endpoint when the server supports it.
3191 let supported_versions = self.supported_versions().await?;
3192 let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3193 .is_supported(&supported_versions);
3194
3195 let upload_size = if use_auth {
3196 self.send(authenticated_media::get_media_config::v1::Request::default())
3197 .await?
3198 .upload_size
3199 } else {
3200 #[allow(deprecated)]
3201 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3202 };
3203
3204 match max_upload_size_lock.set(upload_size) {
3205 Ok(_) => Ok(upload_size),
3206 Err(error) => {
3207 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3208 }
3209 }
3210 }
3211
3212 /// The settings to use for decrypting events.
3213 #[cfg(feature = "e2e-encryption")]
3214 pub fn decryption_settings(&self) -> &DecryptionSettings {
3215 &self.base_client().decryption_settings
3216 }
3217
3218 /// Returns the [`SearchIndex`] for this [`Client`].
3219 #[cfg(feature = "experimental-search")]
3220 pub fn search_index(&self) -> &SearchIndex {
3221 &self.inner.search_index
3222 }
3223
3224 /// Whether the client is configured to take thread subscriptions (MSC4306
3225 /// and MSC4308) into account.
3226 ///
3227 /// This may cause filtering out of thread subscriptions, and loading the
3228 /// thread subscriptions via the sliding sync extension, when the room
3229 /// list service is being used.
3230 pub fn enabled_thread_subscriptions(&self) -> bool {
3231 match self.base_client().threading_support {
3232 ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
3233 ThreadingSupport::Disabled => false,
3234 }
3235 }
3236
3237 /// Fetch thread subscriptions changes between `from` and up to `to`.
3238 ///
3239 /// The `limit` optional parameter can be used to limit the number of
3240 /// entries in a response. It can also be overridden by the server, if
3241 /// it's deemed too large.
3242 pub async fn fetch_thread_subscriptions(
3243 &self,
3244 from: Option<String>,
3245 to: Option<String>,
3246 limit: Option<UInt>,
3247 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3248 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3249 from,
3250 to,
3251 limit,
3252 });
3253 Ok(self.send(request).await?)
3254 }
3255
3256 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3257 self.inner.thread_subscription_catchup.get().unwrap()
3258 }
3259
3260 /// Perform database optimizations if any are available, i.e. vacuuming in
3261 /// SQLite.
3262 ///
3263 /// **Warning:** this was added to check if SQLite fragmentation was the
3264 /// source of performance issues, **DO NOT use in production**.
3265 #[doc(hidden)]
3266 pub async fn optimize_stores(&self) -> Result<()> {
3267 trace!("Optimizing state store...");
3268 self.state_store().optimize().await?;
3269
3270 trace!("Optimizing event cache store...");
3271 if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3272 clean_lock.optimize().await?;
3273 }
3274
3275 trace!("Optimizing media store...");
3276 self.media_store().lock().await?.optimize().await?;
3277
3278 Ok(())
3279 }
3280
3281 /// Returns the sizes of the existing stores, if known.
3282 pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3283 #[cfg(feature = "e2e-encryption")]
3284 let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3285 && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3286 {
3287 Some(store_size)
3288 } else {
3289 None
3290 };
3291 #[cfg(not(feature = "e2e-encryption"))]
3292 let crypto_store_size = None;
3293
3294 let state_store_size = self.state_store().get_size().await.ok().flatten();
3295
3296 let event_cache_store_size = if let Some(clean_lock) =
3297 self.event_cache_store().lock().await?.as_clean()
3298 && let Ok(Some(store_size)) = clean_lock.get_size().await
3299 {
3300 Some(store_size)
3301 } else {
3302 None
3303 };
3304
3305 let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3306
3307 Ok(StoreSizes {
3308 crypto_store: crypto_store_size,
3309 state_store: state_store_size,
3310 event_cache_store: event_cache_store_size,
3311 media_store: media_store_size,
3312 })
3313 }
3314}
3315
3316/// Contains the disk size of the different stores, if known. It won't be
3317/// available for in-memory stores.
3318#[derive(Debug, Clone)]
3319pub struct StoreSizes {
3320 /// The size of the CryptoStore.
3321 pub crypto_store: Option<usize>,
3322 /// The size of the StateStore.
3323 pub state_store: Option<usize>,
3324 /// The size of the EventCacheStore.
3325 pub event_cache_store: Option<usize>,
3326 /// The size of the MediaStore.
3327 pub media_store: Option<usize>,
3328}
3329
3330#[cfg(any(feature = "testing", test))]
3331impl Client {
3332 /// Test helper to mark users as tracked by the crypto layer.
3333 #[cfg(feature = "e2e-encryption")]
3334 pub async fn update_tracked_users_for_testing(
3335 &self,
3336 user_ids: impl IntoIterator<Item = &UserId>,
3337 ) {
3338 let olm = self.olm_machine().await;
3339 let olm = olm.as_ref().unwrap();
3340 olm.update_tracked_users(user_ids).await.unwrap();
3341 }
3342}
3343
3344/// A weak reference to the inner client, useful when trying to get a handle
3345/// on the owning client.
3346#[derive(Clone, Debug)]
3347pub(crate) struct WeakClient {
3348 client: Weak<ClientInner>,
3349}
3350
3351impl WeakClient {
3352 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3353 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3354 Self { client: Arc::downgrade(client) }
3355 }
3356
3357 /// Construct a [`WeakClient`] from a [`Client`].
3358 pub fn from_client(client: &Client) -> Self {
3359 Self::from_inner(&client.inner)
3360 }
3361
3362 /// Attempts to get a [`Client`] from this [`WeakClient`].
3363 pub fn get(&self) -> Option<Client> {
3364 self.client.upgrade().map(|inner| Client { inner })
3365 }
3366
3367 /// Gets the number of strong (`Arc`) pointers still pointing to this
3368 /// client.
3369 #[allow(dead_code)]
3370 pub fn strong_count(&self) -> usize {
3371 self.client.strong_count()
3372 }
3373}
3374
3375/// Information about the state of a room before we joined it.
3376#[derive(Debug, Clone, Default)]
3377struct PreJoinRoomInfo {
3378 /// The user who invited us to the room, if any.
3379 pub inviter: Option<RoomMember>,
3380}
3381
3382// The http mocking library is not supported for wasm32
3383#[cfg(all(test, not(target_family = "wasm")))]
3384pub(crate) mod tests {
3385 use std::{sync::Arc, time::Duration};
3386
3387 use assert_matches::assert_matches;
3388 use assert_matches2::assert_let;
3389 use eyeball::SharedObservable;
3390 use futures_util::{FutureExt, StreamExt, pin_mut};
3391 use js_int::{UInt, uint};
3392 use matrix_sdk_base::{
3393 RoomState,
3394 store::{MemoryStore, StoreConfig},
3395 };
3396 use matrix_sdk_test::{
3397 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3398 event_factory::EventFactory,
3399 };
3400 #[cfg(target_family = "wasm")]
3401 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3402
3403 use ruma::{
3404 RoomId, ServerName, UserId,
3405 api::{
3406 FeatureFlag, MatrixVersion,
3407 client::{
3408 discovery::discover_homeserver::RtcFocusInfo,
3409 room::create_room::v3::Request as CreateRoomRequest,
3410 },
3411 },
3412 assign,
3413 events::{
3414 ignored_user_list::IgnoredUserListEventContent,
3415 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3416 },
3417 owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3418 };
3419 use serde_json::json;
3420 use stream_assert::{assert_next_matches, assert_pending};
3421 use tokio::{
3422 spawn,
3423 time::{sleep, timeout},
3424 };
3425 use url::Url;
3426
3427 use super::Client;
3428 use crate::{
3429 Error, TransmissionProgress,
3430 client::{WeakClient, futures::SendMediaUploadRequest},
3431 config::{RequestConfig, SyncSettings},
3432 futures::SendRequest,
3433 media::MediaError,
3434 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3435 };
3436
3437 #[async_test]
3438 async fn test_account_data() {
3439 let server = MatrixMockServer::new().await;
3440 let client = server.client_builder().build().await;
3441
3442 let f = EventFactory::new();
3443 server
3444 .mock_sync()
3445 .ok_and_run(&client, |builder| {
3446 builder.add_global_account_data(
3447 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3448 );
3449 })
3450 .await;
3451
3452 let content = client
3453 .account()
3454 .account_data::<IgnoredUserListEventContent>()
3455 .await
3456 .unwrap()
3457 .unwrap()
3458 .deserialize()
3459 .unwrap();
3460
3461 assert_eq!(content.ignored_users.len(), 1);
3462 }
3463
3464 #[async_test]
3465 async fn test_successful_discovery() {
3466 // Imagine this is `matrix.org`.
3467 let server = MatrixMockServer::new().await;
3468 let server_url = server.uri();
3469
3470 // Imagine this is `matrix-client.matrix.org`.
3471 let homeserver = MatrixMockServer::new().await;
3472 let homeserver_url = homeserver.uri();
3473
3474 // Imagine Alice has the user ID `@alice:matrix.org`.
3475 let domain = server_url.strip_prefix("http://").unwrap();
3476 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3477
3478 // The `.well-known` is on the server (e.g. `matrix.org`).
3479 server
3480 .mock_well_known()
3481 .ok_with_homeserver_url(&homeserver_url)
3482 .mock_once()
3483 .named("well-known")
3484 .mount()
3485 .await;
3486
3487 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3488 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3489
3490 let client = Client::builder()
3491 .insecure_server_name_no_tls(alice.server_name())
3492 .build()
3493 .await
3494 .unwrap();
3495
3496 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3497 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3498 client.server_versions().await.unwrap();
3499 }
3500
3501 #[async_test]
3502 async fn test_discovery_broken_server() {
3503 let server = MatrixMockServer::new().await;
3504 let server_url = server.uri();
3505 let domain = server_url.strip_prefix("http://").unwrap();
3506 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3507
3508 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3509
3510 assert!(
3511 Client::builder()
3512 .insecure_server_name_no_tls(alice.server_name())
3513 .build()
3514 .await
3515 .is_err(),
3516 "Creating a client from a user ID should fail when the .well-known request fails."
3517 );
3518 }
3519
3520 #[async_test]
3521 async fn test_room_creation() {
3522 let server = MatrixMockServer::new().await;
3523 let client = server.client_builder().build().await;
3524
3525 server
3526 .mock_sync()
3527 .ok_and_run(&client, |builder| {
3528 builder.add_joined_room(
3529 JoinedRoomBuilder::default()
3530 .add_state_event(StateTestEvent::Member)
3531 .add_state_event(StateTestEvent::PowerLevels),
3532 );
3533 })
3534 .await;
3535
3536 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3537 assert_eq!(room.state(), RoomState::Joined);
3538 }
3539
3540 #[async_test]
3541 async fn test_retry_limit_http_requests() {
3542 let server = MatrixMockServer::new().await;
3543 let client = server
3544 .client_builder()
3545 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3546 .build()
3547 .await;
3548
3549 assert!(client.request_config().retry_limit.unwrap() == 3);
3550
3551 server.mock_login().error500().expect(3).mount().await;
3552
3553 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3554 }
3555
3556 #[async_test]
3557 async fn test_retry_timeout_http_requests() {
3558 // Keep this timeout small so that the test doesn't take long
3559 let retry_timeout = Duration::from_secs(5);
3560 let server = MatrixMockServer::new().await;
3561 let client = server
3562 .client_builder()
3563 .on_builder(|builder| {
3564 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3565 })
3566 .build()
3567 .await;
3568
3569 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3570
3571 server.mock_login().error500().expect(2..).mount().await;
3572
3573 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3574 }
3575
3576 #[async_test]
3577 async fn test_short_retry_initial_http_requests() {
3578 let server = MatrixMockServer::new().await;
3579 let client = server
3580 .client_builder()
3581 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3582 .build()
3583 .await;
3584
3585 server.mock_login().error500().expect(3..).mount().await;
3586
3587 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3588 }
3589
3590 #[async_test]
3591 async fn test_no_retry_http_requests() {
3592 let server = MatrixMockServer::new().await;
3593 let client = server.client_builder().build().await;
3594
3595 server.mock_devices().error500().mock_once().mount().await;
3596
3597 client.devices().await.unwrap_err();
3598 }
3599
3600 #[async_test]
3601 async fn test_set_homeserver() {
3602 let client = MockClientBuilder::new(None).build().await;
3603 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3604
3605 let homeserver = Url::parse("http://example.com/").unwrap();
3606 client.set_homeserver(homeserver.clone());
3607 assert_eq!(client.homeserver(), homeserver);
3608 }
3609
3610 #[async_test]
3611 async fn test_search_user_request() {
3612 let server = MatrixMockServer::new().await;
3613 let client = server.client_builder().build().await;
3614
3615 server.mock_user_directory().ok().mock_once().mount().await;
3616
3617 let response = client.search_users("test", 50).await.unwrap();
3618 assert_eq!(response.results.len(), 1);
3619 let result = &response.results[0];
3620 assert_eq!(result.user_id.to_string(), "@test:example.me");
3621 assert_eq!(result.display_name.clone().unwrap(), "Test");
3622 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3623 assert!(!response.limited);
3624 }
3625
3626 #[async_test]
3627 async fn test_request_unstable_features() {
3628 let server = MatrixMockServer::new().await;
3629 let client = server.client_builder().no_server_versions().build().await;
3630
3631 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3632
3633 let unstable_features = client.unstable_features().await.unwrap();
3634 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3635 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3636 }
3637
3638 #[async_test]
3639 async fn test_can_homeserver_push_encrypted_event_to_device() {
3640 let server = MatrixMockServer::new().await;
3641 let client = server.client_builder().no_server_versions().build().await;
3642
3643 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3644
3645 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3646 assert!(msc4028_enabled);
3647 }
3648
3649 #[async_test]
3650 async fn test_recently_visited_rooms() {
3651 // Tracking recently visited rooms requires authentication
3652 let client = MockClientBuilder::new(None).unlogged().build().await;
3653 assert_matches!(
3654 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3655 Err(Error::AuthenticationRequired)
3656 );
3657
3658 let client = MockClientBuilder::new(None).build().await;
3659 let account = client.account();
3660
3661 // We should start off with an empty list
3662 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3663
3664 // Tracking a valid room id should add it to the list
3665 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3666 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3667 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3668
3669 // And the existing list shouldn't be changed
3670 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3671 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3672
3673 // Tracking the same room again shouldn't change the list
3674 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3675 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3676 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3677
3678 // Tracking a second room should add it to the front of the list
3679 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3680 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3681 assert_eq!(
3682 account.get_recently_visited_rooms().await.unwrap(),
3683 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3684 );
3685
3686 // Tracking the first room yet again should move it to the front of the list
3687 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3688 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3689 assert_eq!(
3690 account.get_recently_visited_rooms().await.unwrap(),
3691 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3692 );
3693
3694 // Tracking should be capped at 20
3695 for n in 0..20 {
3696 account
3697 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3698 .await
3699 .unwrap();
3700 }
3701
3702 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3703
3704 // And the initial rooms should've been pushed out
3705 let rooms = account.get_recently_visited_rooms().await.unwrap();
3706 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3707 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3708
3709 // And the last tracked room should be the first
3710 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3711 }
3712
3713 #[async_test]
3714 async fn test_client_no_cycle_with_event_cache() {
3715 let client = MockClientBuilder::new(None).build().await;
3716
3717 // Wait for the init tasks to die.
3718 sleep(Duration::from_secs(1)).await;
3719
3720 let weak_client = WeakClient::from_client(&client);
3721 assert_eq!(weak_client.strong_count(), 1);
3722
3723 {
3724 let room_id = room_id!("!room:example.org");
3725
3726 // Have the client know the room.
3727 let response = SyncResponseBuilder::default()
3728 .add_joined_room(JoinedRoomBuilder::new(room_id))
3729 .build_sync_response();
3730 client.inner.base_client.receive_sync_response(response).await.unwrap();
3731
3732 client.event_cache().subscribe().unwrap();
3733
3734 let (_room_event_cache, _drop_handles) =
3735 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3736 }
3737
3738 drop(client);
3739
3740 // Give a bit of time for background tasks to die.
3741 sleep(Duration::from_secs(1)).await;
3742
3743 // The weak client must be the last reference to the client now.
3744 assert_eq!(weak_client.strong_count(), 0);
3745 let client = weak_client.get();
3746 assert!(
3747 client.is_none(),
3748 "too many strong references to the client: {}",
3749 Arc::strong_count(&client.unwrap().inner)
3750 );
3751 }
3752
3753 #[async_test]
3754 async fn test_supported_versions_caching() {
3755 let server = MatrixMockServer::new().await;
3756
3757 let versions_mock = server
3758 .mock_versions()
3759 .expect_default_access_token()
3760 .ok_with_unstable_features()
3761 .named("first versions mock")
3762 .expect(1)
3763 .mount_as_scoped()
3764 .await;
3765
3766 let memory_store = Arc::new(MemoryStore::new());
3767 let client = server
3768 .client_builder()
3769 .no_server_versions()
3770 .on_builder(|builder| {
3771 builder.store_config(
3772 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3773 .state_store(memory_store.clone()),
3774 )
3775 })
3776 .build()
3777 .await;
3778
3779 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3780
3781 // The result was cached.
3782 assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3783 // This subsequent call hits the in-memory cache.
3784 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3785
3786 drop(client);
3787
3788 let client = server
3789 .client_builder()
3790 .no_server_versions()
3791 .on_builder(|builder| {
3792 builder.store_config(
3793 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3794 .state_store(memory_store.clone()),
3795 )
3796 })
3797 .build()
3798 .await;
3799
3800 // These calls to the new client hit the on-disk cache.
3801 assert!(
3802 client
3803 .unstable_features()
3804 .await
3805 .unwrap()
3806 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3807 );
3808 let supported = client.supported_versions().await.unwrap();
3809 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3810 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3811
3812 // Then this call hits the in-memory cache.
3813 let supported = client.supported_versions().await.unwrap();
3814 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3815 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3816
3817 drop(versions_mock);
3818
3819 // Now, reset the cache, and observe the endpoints being called again once.
3820 client.reset_supported_versions().await.unwrap();
3821
3822 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3823
3824 // Hits network again.
3825 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3826 // Hits in-memory cache again.
3827 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3828 }
3829
3830 #[async_test]
3831 async fn test_well_known_caching() {
3832 let server = MatrixMockServer::new().await;
3833 let server_url = server.uri();
3834 let domain = server_url.strip_prefix("http://").unwrap();
3835 let server_name = <&ServerName>::try_from(domain).unwrap();
3836 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3837
3838 let well_known_mock = server
3839 .mock_well_known()
3840 .ok()
3841 .named("well known mock")
3842 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3843 .mount_as_scoped()
3844 .await;
3845
3846 let memory_store = Arc::new(MemoryStore::new());
3847 let client = Client::builder()
3848 .insecure_server_name_no_tls(server_name)
3849 .store_config(
3850 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3851 .state_store(memory_store.clone()),
3852 )
3853 .build()
3854 .await
3855 .unwrap();
3856
3857 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3858
3859 // This subsequent call hits the in-memory cache.
3860 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3861
3862 drop(client);
3863
3864 let client = server
3865 .client_builder()
3866 .no_server_versions()
3867 .on_builder(|builder| {
3868 builder.store_config(
3869 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3870 .state_store(memory_store.clone()),
3871 )
3872 })
3873 .build()
3874 .await;
3875
3876 // This call to the new client hits the on-disk cache.
3877 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3878
3879 // Then this call hits the in-memory cache.
3880 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3881
3882 drop(well_known_mock);
3883
3884 // Now, reset the cache, and observe the endpoints being called again once.
3885 client.reset_well_known().await.unwrap();
3886
3887 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3888
3889 // Hits network again.
3890 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3891 // Hits in-memory cache again.
3892 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3893 }
3894
3895 #[async_test]
3896 async fn test_missing_well_known_caching() {
3897 let server = MatrixMockServer::new().await;
3898 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3899
3900 let well_known_mock = server
3901 .mock_well_known()
3902 .error_unrecognized()
3903 .named("first well-known mock")
3904 .expect(1)
3905 .mount_as_scoped()
3906 .await;
3907
3908 let memory_store = Arc::new(MemoryStore::new());
3909 let client = server
3910 .client_builder()
3911 .on_builder(|builder| {
3912 builder.store_config(
3913 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3914 .state_store(memory_store.clone()),
3915 )
3916 })
3917 .build()
3918 .await;
3919
3920 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3921
3922 // This subsequent call hits the in-memory cache.
3923 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3924
3925 drop(client);
3926
3927 let client = server
3928 .client_builder()
3929 .on_builder(|builder| {
3930 builder.store_config(
3931 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3932 .state_store(memory_store.clone()),
3933 )
3934 })
3935 .build()
3936 .await;
3937
3938 // This call to the new client hits the on-disk cache.
3939 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3940
3941 // Then this call hits the in-memory cache.
3942 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3943
3944 drop(well_known_mock);
3945
3946 // Now, reset the cache, and observe the endpoints being called again once.
3947 client.reset_well_known().await.unwrap();
3948
3949 server
3950 .mock_well_known()
3951 .error_unrecognized()
3952 .expect(1)
3953 .named("second well-known mock")
3954 .mount()
3955 .await;
3956
3957 // Hits network again.
3958 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3959 // Hits in-memory cache again.
3960 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3961 }
3962
3963 #[async_test]
3964 async fn test_no_network_doesnt_cause_infinite_retries() {
3965 // We want infinite retries for transient errors.
3966 let client = MockClientBuilder::new(None)
3967 .on_builder(|builder| builder.request_config(RequestConfig::new()))
3968 .build()
3969 .await;
3970
3971 // We don't define a mock server on purpose here, so that the error is really a
3972 // network error.
3973 client.whoami().await.unwrap_err();
3974 }
3975
3976 #[async_test]
3977 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3978 let server = MatrixMockServer::new().await;
3979 let client = server.client_builder().build().await;
3980
3981 let room_id = room_id!("!room:example.org");
3982
3983 server
3984 .mock_sync()
3985 .ok_and_run(&client, |builder| {
3986 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3987 })
3988 .await;
3989
3990 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3991 assert_eq!(room.room_id(), room_id);
3992 }
3993
3994 #[async_test]
3995 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3996 let server = MatrixMockServer::new().await;
3997 let client = server.client_builder().build().await;
3998
3999 let room_id = room_id!("!room:example.org");
4000
4001 let client = Arc::new(client);
4002
4003 // Perform the /sync request with a delay so it starts after the
4004 // `await_room_remote_echo` call has happened
4005 spawn({
4006 let client = client.clone();
4007 async move {
4008 sleep(Duration::from_millis(100)).await;
4009
4010 server
4011 .mock_sync()
4012 .ok_and_run(&client, |builder| {
4013 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4014 })
4015 .await;
4016 }
4017 });
4018
4019 let room =
4020 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4021 assert_eq!(room.room_id(), room_id);
4022 }
4023
4024 #[async_test]
4025 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4026 let client = MockClientBuilder::new(None).build().await;
4027
4028 let room_id = room_id!("!room:example.org");
4029 // Room is not present so the client won't be able to find it. The call will
4030 // timeout.
4031 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4032 }
4033
4034 #[async_test]
4035 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4036 let server = MatrixMockServer::new().await;
4037 let client = server.client_builder().build().await;
4038
4039 server.mock_create_room().ok().mount().await;
4040
4041 // Create a room in the internal store
4042 let room = client
4043 .create_room(assign!(CreateRoomRequest::new(), {
4044 invite: vec![],
4045 is_direct: false,
4046 }))
4047 .await
4048 .unwrap();
4049
4050 // Room is locally present, but not synced, the call will timeout
4051 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4052 .await
4053 .unwrap_err();
4054 }
4055
4056 #[async_test]
4057 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4058 let server = MatrixMockServer::new().await;
4059 let client = server.client_builder().build().await;
4060
4061 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4062
4063 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4064 assert_matches!(ret, Ok(true));
4065 }
4066
4067 #[async_test]
4068 async fn test_is_room_alias_available_if_alias_is_resolved() {
4069 let server = MatrixMockServer::new().await;
4070 let client = server.client_builder().build().await;
4071
4072 server
4073 .mock_room_directory_resolve_alias()
4074 .ok("!some_room_id:matrix.org", Vec::new())
4075 .expect(1)
4076 .mount()
4077 .await;
4078
4079 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4080 assert_matches!(ret, Ok(false));
4081 }
4082
4083 #[async_test]
4084 async fn test_is_room_alias_available_if_error_found() {
4085 let server = MatrixMockServer::new().await;
4086 let client = server.client_builder().build().await;
4087
4088 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4089
4090 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4091 assert_matches!(ret, Err(_));
4092 }
4093
4094 #[async_test]
4095 async fn test_create_room_alias() {
4096 let server = MatrixMockServer::new().await;
4097 let client = server.client_builder().build().await;
4098
4099 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4100
4101 let ret = client
4102 .create_room_alias(
4103 room_alias_id!("#some_alias:matrix.org"),
4104 room_id!("!some_room:matrix.org"),
4105 )
4106 .await;
4107 assert_matches!(ret, Ok(()));
4108 }
4109
4110 #[async_test]
4111 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4112 let server = MatrixMockServer::new().await;
4113 let client = server.client_builder().build().await;
4114
4115 let room_id = room_id!("!a-room:matrix.org");
4116
4117 // Make sure the summary endpoint is called once
4118 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4119
4120 // We create a locally cached invited room
4121 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4122
4123 // And we get a preview, the server endpoint was reached
4124 let preview = client
4125 .get_room_preview(room_id.into(), Vec::new())
4126 .await
4127 .expect("Room preview should be retrieved");
4128
4129 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
4130 }
4131
4132 #[async_test]
4133 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4134 let server = MatrixMockServer::new().await;
4135 let client = server.client_builder().build().await;
4136
4137 let room_id = room_id!("!a-room:matrix.org");
4138
4139 // Make sure the summary endpoint is called once
4140 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4141
4142 // We create a locally cached left room
4143 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4144
4145 // And we get a preview, the server endpoint was reached
4146 let preview = client
4147 .get_room_preview(room_id.into(), Vec::new())
4148 .await
4149 .expect("Room preview should be retrieved");
4150
4151 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
4152 }
4153
4154 #[async_test]
4155 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4156 let server = MatrixMockServer::new().await;
4157 let client = server.client_builder().build().await;
4158
4159 let room_id = room_id!("!a-room:matrix.org");
4160
4161 // Make sure the summary endpoint is called once
4162 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4163
4164 // We create a locally cached knocked room
4165 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4166
4167 // And we get a preview, the server endpoint was reached
4168 let preview = client
4169 .get_room_preview(room_id.into(), Vec::new())
4170 .await
4171 .expect("Room preview should be retrieved");
4172
4173 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
4174 }
4175
4176 #[async_test]
4177 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4178 let server = MatrixMockServer::new().await;
4179 let client = server.client_builder().build().await;
4180
4181 let room_id = room_id!("!a-room:matrix.org");
4182
4183 // Make sure the summary endpoint is not called
4184 server.mock_room_summary().ok(room_id).never().mount().await;
4185
4186 // We create a locally cached joined room
4187 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4188
4189 // And we get a preview, no server endpoint was reached
4190 let preview = client
4191 .get_room_preview(room_id.into(), Vec::new())
4192 .await
4193 .expect("Room preview should be retrieved");
4194
4195 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
4196 }
4197
4198 #[async_test]
4199 async fn test_media_preview_config() {
4200 let server = MatrixMockServer::new().await;
4201 let client = server.client_builder().build().await;
4202
4203 server
4204 .mock_sync()
4205 .ok_and_run(&client, |builder| {
4206 builder.add_custom_global_account_data(json!({
4207 "content": {
4208 "media_previews": "private",
4209 "invite_avatars": "off"
4210 },
4211 "type": "m.media_preview_config"
4212 }));
4213 })
4214 .await;
4215
4216 let (initial_value, stream) =
4217 client.account().observe_media_preview_config().await.unwrap();
4218
4219 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4220 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4221 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4222 pin_mut!(stream);
4223 assert_pending!(stream);
4224
4225 server
4226 .mock_sync()
4227 .ok_and_run(&client, |builder| {
4228 builder.add_custom_global_account_data(json!({
4229 "content": {
4230 "media_previews": "off",
4231 "invite_avatars": "on"
4232 },
4233 "type": "m.media_preview_config"
4234 }));
4235 })
4236 .await;
4237
4238 assert_next_matches!(
4239 stream,
4240 MediaPreviewConfigEventContent {
4241 media_previews: Some(MediaPreviews::Off),
4242 invite_avatars: Some(InviteAvatars::On),
4243 ..
4244 }
4245 );
4246 assert_pending!(stream);
4247 }
4248
4249 #[async_test]
4250 async fn test_unstable_media_preview_config() {
4251 let server = MatrixMockServer::new().await;
4252 let client = server.client_builder().build().await;
4253
4254 server
4255 .mock_sync()
4256 .ok_and_run(&client, |builder| {
4257 builder.add_custom_global_account_data(json!({
4258 "content": {
4259 "media_previews": "private",
4260 "invite_avatars": "off"
4261 },
4262 "type": "io.element.msc4278.media_preview_config"
4263 }));
4264 })
4265 .await;
4266
4267 let (initial_value, stream) =
4268 client.account().observe_media_preview_config().await.unwrap();
4269
4270 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4271 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4272 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4273 pin_mut!(stream);
4274 assert_pending!(stream);
4275
4276 server
4277 .mock_sync()
4278 .ok_and_run(&client, |builder| {
4279 builder.add_custom_global_account_data(json!({
4280 "content": {
4281 "media_previews": "off",
4282 "invite_avatars": "on"
4283 },
4284 "type": "io.element.msc4278.media_preview_config"
4285 }));
4286 })
4287 .await;
4288
4289 assert_next_matches!(
4290 stream,
4291 MediaPreviewConfigEventContent {
4292 media_previews: Some(MediaPreviews::Off),
4293 invite_avatars: Some(InviteAvatars::On),
4294 ..
4295 }
4296 );
4297 assert_pending!(stream);
4298 }
4299
4300 #[async_test]
4301 async fn test_media_preview_config_not_found() {
4302 let server = MatrixMockServer::new().await;
4303 let client = server.client_builder().build().await;
4304
4305 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4306
4307 assert!(initial_value.is_none());
4308 }
4309
4310 #[async_test]
4311 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4312 // The default Matrix version we use is 1.11 or higher, so authenticated media
4313 // is supported.
4314 let server = MatrixMockServer::new().await;
4315 let client = server.client_builder().build().await;
4316
4317 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4318
4319 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4320 client.load_or_fetch_max_upload_size().await.unwrap();
4321
4322 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4323 }
4324
4325 #[async_test]
4326 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4327 // The server must advertise support for the stable feature for authenticated
4328 // media support, so we mock the `GET /versions` response.
4329 let server = MatrixMockServer::new().await;
4330 let client = server.client_builder().no_server_versions().build().await;
4331
4332 server
4333 .mock_versions()
4334 .ok_custom(
4335 &["v1.7", "v1.8", "v1.9", "v1.10"],
4336 &[("org.matrix.msc3916.stable", true)].into(),
4337 )
4338 .named("versions")
4339 .expect(1)
4340 .mount()
4341 .await;
4342
4343 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4344
4345 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4346 client.load_or_fetch_max_upload_size().await.unwrap();
4347
4348 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4349 }
4350
4351 #[async_test]
4352 async fn test_load_or_fetch_max_upload_size_no_auth() {
4353 // The server must not support Matrix 1.11 or higher for unauthenticated
4354 // media requests, so we mock the `GET /versions` response.
4355 let server = MatrixMockServer::new().await;
4356 let client = server.client_builder().no_server_versions().build().await;
4357
4358 server
4359 .mock_versions()
4360 .ok_custom(&["v1.1"], &Default::default())
4361 .named("versions")
4362 .expect(1)
4363 .mount()
4364 .await;
4365
4366 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4367
4368 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4369 client.load_or_fetch_max_upload_size().await.unwrap();
4370
4371 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4372 }
4373
4374 #[async_test]
4375 async fn test_uploading_a_too_large_media_file() {
4376 let server = MatrixMockServer::new().await;
4377 let client = server.client_builder().build().await;
4378
4379 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4380 client.load_or_fetch_max_upload_size().await.unwrap();
4381 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4382
4383 let data = vec![1, 2];
4384 let upload_request =
4385 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4386 let request = SendRequest {
4387 client: client.clone(),
4388 request: upload_request,
4389 config: None,
4390 send_progress: SharedObservable::new(TransmissionProgress::default()),
4391 };
4392 let media_request = SendMediaUploadRequest::new(request);
4393
4394 let error = media_request.await.err();
4395 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4396 assert_eq!(max, uint!(1));
4397 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4398 }
4399
4400 #[async_test]
4401 async fn test_dont_ignore_timeout_on_first_sync() {
4402 let server = MatrixMockServer::new().await;
4403 let client = server.client_builder().build().await;
4404
4405 server
4406 .mock_sync()
4407 .timeout(Some(Duration::from_secs(30)))
4408 .ok(|_| {})
4409 .mock_once()
4410 .named("sync_with_timeout")
4411 .mount()
4412 .await;
4413
4414 // Call the endpoint once to check the timeout.
4415 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4416
4417 timeout(Duration::from_secs(1), async {
4418 stream.next().await.unwrap().unwrap();
4419 })
4420 .await
4421 .unwrap();
4422 }
4423
4424 #[async_test]
4425 async fn test_ignore_timeout_on_first_sync() {
4426 let server = MatrixMockServer::new().await;
4427 let client = server.client_builder().build().await;
4428
4429 server
4430 .mock_sync()
4431 .timeout(None)
4432 .ok(|_| {})
4433 .mock_once()
4434 .named("sync_no_timeout")
4435 .mount()
4436 .await;
4437 server
4438 .mock_sync()
4439 .timeout(Some(Duration::from_secs(30)))
4440 .ok(|_| {})
4441 .mock_once()
4442 .named("sync_with_timeout")
4443 .mount()
4444 .await;
4445
4446 // Call each version of the endpoint once to check the timeouts.
4447 let mut stream = Box::pin(
4448 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4449 );
4450
4451 timeout(Duration::from_secs(1), async {
4452 stream.next().await.unwrap().unwrap();
4453 stream.next().await.unwrap().unwrap();
4454 })
4455 .await
4456 .unwrap();
4457 }
4458
4459 #[async_test]
4460 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4461 let server = MatrixMockServer::new().await;
4462 let client = server.client_builder().build().await;
4463 // This is the user ID that is inside MemberAdditional.
4464 // Note the confusing username, so we can share
4465 // GlobalAccountDataTestEvent::Direct with the invited test.
4466 let user_id = user_id!("@invited:localhost");
4467
4468 // When we receive a sync response saying "invited" is invited to a DM
4469 let f = EventFactory::new();
4470 let response = SyncResponseBuilder::default()
4471 .add_joined_room(
4472 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
4473 )
4474 .add_global_account_data(
4475 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4476 )
4477 .build_sync_response();
4478 client.base_client().receive_sync_response(response).await.unwrap();
4479
4480 // Then get_dm_room finds this room
4481 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4482 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4483 }
4484
4485 #[async_test]
4486 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4487 let server = MatrixMockServer::new().await;
4488 let client = server.client_builder().build().await;
4489 // This is the user ID that is inside MemberInvite
4490 let user_id = user_id!("@invited:localhost");
4491
4492 // When we receive a sync response saying "invited" is invited to a DM
4493 let f = EventFactory::new();
4494 let response = SyncResponseBuilder::default()
4495 .add_joined_room(
4496 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
4497 )
4498 .add_global_account_data(
4499 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4500 )
4501 .build_sync_response();
4502 client.base_client().receive_sync_response(response).await.unwrap();
4503
4504 // Then get_dm_room finds this room
4505 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4506 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4507 }
4508
4509 #[async_test]
4510 async fn test_get_dm_room_still_finds_left_room() {
4511 // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4512 // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4513
4514 let server = MatrixMockServer::new().await;
4515 let client = server.client_builder().build().await;
4516 // This is the user ID that is inside MemberAdditional.
4517 // Note the confusing username, so we can share
4518 // GlobalAccountDataTestEvent::Direct with the invited test.
4519 let user_id = user_id!("@invited:localhost");
4520
4521 // When we receive a sync response saying "invited" is invited to a DM
4522 let f = EventFactory::new();
4523 let response = SyncResponseBuilder::default()
4524 .add_joined_room(
4525 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
4526 )
4527 .add_global_account_data(
4528 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4529 )
4530 .build_sync_response();
4531 client.base_client().receive_sync_response(response).await.unwrap();
4532
4533 // Then get_dm_room finds this room
4534 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4535 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4536 }
4537
4538 #[async_test]
4539 async fn test_device_exists() {
4540 let server = MatrixMockServer::new().await;
4541 let client = server.client_builder().build().await;
4542
4543 server.mock_get_device().ok().expect(1).mount().await;
4544
4545 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4546 }
4547
4548 #[async_test]
4549 async fn test_device_exists_404() {
4550 let server = MatrixMockServer::new().await;
4551 let client = server.client_builder().build().await;
4552
4553 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4554 }
4555
4556 #[async_test]
4557 async fn test_device_exists_500() {
4558 let server = MatrixMockServer::new().await;
4559 let client = server.client_builder().build().await;
4560
4561 server.mock_get_device().error500().expect(1).mount().await;
4562
4563 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4564 }
4565
4566 #[async_test]
4567 async fn test_fetching_well_known_with_homeserver_url() {
4568 let server = MatrixMockServer::new().await;
4569 let client = server.client_builder().build().await;
4570 server.mock_well_known().ok().mount().await;
4571
4572 assert_matches!(client.fetch_client_well_known().await, Some(_));
4573 }
4574
4575 #[async_test]
4576 async fn test_fetching_well_known_with_server_name() {
4577 let server = MatrixMockServer::new().await;
4578 let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4579
4580 server.mock_well_known().ok().mount().await;
4581
4582 let client = MockClientBuilder::new(None)
4583 .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4584 .build()
4585 .await;
4586
4587 assert_matches!(client.fetch_client_well_known().await, Some(_));
4588 }
4589
4590 #[async_test]
4591 async fn test_fetching_well_known_with_domain_part_of_user_id() {
4592 let server = MatrixMockServer::new().await;
4593 server.mock_well_known().ok().mount().await;
4594
4595 let user_id =
4596 UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4597 let client = MockClientBuilder::new(None)
4598 .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4599 .build()
4600 .await;
4601
4602 assert_matches!(client.fetch_client_well_known().await, Some(_));
4603 }
4604}