matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32 DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
36 StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
37 event_cache::store::EventCacheStoreLock,
38 media::store::MediaStoreLock,
39 store::{
40 DynStateStore, RoomLoadSettings, SupportedVersionsResponse, TtlStoreValue,
41 WellKnownResponse,
42 },
43 sync::{Notification, RoomUpdates},
44 task_monitor::TaskMonitor,
45};
46use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl_cache::TtlCache};
47#[cfg(feature = "e2e-encryption")]
48use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
49use ruma::{
50 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
51 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
52 api::{
53 FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
54 client::{
55 account::whoami,
56 alias::{create_alias, delete_alias, get_alias},
57 authenticated_media,
58 device::{self, delete_devices, get_devices, update_device},
59 directory::{get_public_rooms, get_public_rooms_filtered},
60 discovery::{
61 discover_homeserver::{self, RtcFocusInfo},
62 get_supported_versions,
63 },
64 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
65 knock::knock_room,
66 media,
67 membership::{join_room_by_id, join_room_by_id_or_alias},
68 room::create_room,
69 session::login::v3::DiscoveryInfo,
70 sync::sync_events,
71 threads::get_thread_subscriptions_changes,
72 uiaa,
73 user_directory::search_users,
74 },
75 error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
76 path_builder::PathBuilder,
77 },
78 assign,
79 events::direct::DirectUserIdentifier,
80 push::Ruleset,
81 time::Instant,
82};
83use serde::de::DeserializeOwned;
84use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
85use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
86use url::Url;
87
88use self::{
89 caches::{CachedValue, ClientCaches},
90 futures::SendRequest,
91};
92use crate::{
93 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
94 Room, SessionTokens, TransmissionProgress,
95 authentication::{
96 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
97 oauth::OAuth,
98 },
99 client::{
100 homeserver_capabilities::HomeserverCapabilities,
101 thread_subscriptions::ThreadSubscriptionCatchup,
102 },
103 config::{RequestConfig, SyncToken},
104 deduplicating_handler::DeduplicatingHandler,
105 error::HttpResult,
106 event_cache::EventCache,
107 event_handler::{
108 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
109 EventHandlerStore, ObservableEventHandler, SyncEvent,
110 },
111 http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
112 latest_events::LatestEvents,
113 media::MediaError,
114 notification_settings::NotificationSettings,
115 room::RoomMember,
116 room_preview::RoomPreview,
117 send_queue::{SendQueue, SendQueueData},
118 sliding_sync::Version as SlidingSyncVersion,
119 sync::{RoomUpdate, SyncResponse},
120};
121#[cfg(feature = "e2e-encryption")]
122use crate::{
123 cross_process_lock::CrossProcessLock,
124 encryption::{
125 DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
126 VerificationState,
127 },
128};
129
130mod builder;
131pub(crate) mod caches;
132pub(crate) mod futures;
133pub(crate) mod homeserver_capabilities;
134pub(crate) mod thread_subscriptions;
135
136pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
137#[cfg(feature = "experimental-search")]
138use crate::search_index::SearchIndex;
139
140#[cfg(not(target_family = "wasm"))]
141type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
142#[cfg(target_family = "wasm")]
143type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
144
145#[cfg(not(target_family = "wasm"))]
146type NotificationHandlerFn =
147 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
148#[cfg(target_family = "wasm")]
149type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
150
151/// Enum controlling if a loop running callbacks should continue or abort.
152///
153/// This is mainly used in the [`sync_with_callback`] method, the return value
154/// of the provided callback controls if the sync loop should be exited.
155///
156/// [`sync_with_callback`]: #method.sync_with_callback
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub enum LoopCtrl {
159 /// Continue running the loop.
160 Continue,
161 /// Break out of the loop.
162 Break,
163}
164
165/// Represents changes that can occur to a `Client`s `Session`.
166#[derive(Debug, Clone, PartialEq)]
167pub enum SessionChange {
168 /// The session's token is no longer valid.
169 UnknownToken(UnknownTokenErrorData),
170 /// The session's tokens have been refreshed.
171 TokensRefreshed,
172}
173
174/// Information about the server vendor obtained from the federation API.
175#[derive(Debug, Clone, PartialEq, Eq)]
176#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
177pub struct ServerVendorInfo {
178 /// The server name.
179 pub server_name: String,
180 /// The server version.
181 pub version: String,
182}
183
184/// An async/await enabled Matrix client.
185///
186/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
187#[derive(Clone)]
188pub struct Client {
189 pub(crate) inner: Arc<ClientInner>,
190}
191
192#[derive(Default)]
193pub(crate) struct ClientLocks {
194 /// Lock ensuring that only a single room may be marked as a DM at once.
195 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
196 /// explanation.
197 pub(crate) mark_as_dm_lock: Mutex<()>,
198
199 /// Lock ensuring that only a single secret store is getting opened at the
200 /// same time.
201 ///
202 /// This is important so we don't accidentally create multiple different new
203 /// default secret storage keys.
204 #[cfg(feature = "e2e-encryption")]
205 pub(crate) open_secret_store_lock: Mutex<()>,
206
207 /// Lock ensuring that we're only storing a single secret at a time.
208 ///
209 /// Take a look at the [`SecretStore::put_secret`] method for a more
210 /// detailed explanation.
211 ///
212 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
213 #[cfg(feature = "e2e-encryption")]
214 pub(crate) store_secret_lock: Mutex<()>,
215
216 /// Lock ensuring that only one method at a time might modify our backup.
217 #[cfg(feature = "e2e-encryption")]
218 pub(crate) backup_modify_lock: Mutex<()>,
219
220 /// Lock ensuring that we're going to attempt to upload backups for a single
221 /// requester.
222 #[cfg(feature = "e2e-encryption")]
223 pub(crate) backup_upload_lock: Mutex<()>,
224
225 /// Handler making sure we only have one group session sharing request in
226 /// flight per room.
227 #[cfg(feature = "e2e-encryption")]
228 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
229
230 /// Lock making sure we're only doing one key claim request at a time.
231 #[cfg(feature = "e2e-encryption")]
232 pub(crate) key_claim_lock: Mutex<()>,
233
234 /// Handler to ensure that only one members request is running at a time,
235 /// given a room.
236 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
237
238 /// Handler to ensure that only one encryption state request is running at a
239 /// time, given a room.
240 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
241
242 /// Deduplicating handler for sending read receipts. The string is an
243 /// internal implementation detail, see [`Self::send_single_receipt`].
244 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
245
246 #[cfg(feature = "e2e-encryption")]
247 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
248}
249
250pub(crate) struct ClientInner {
251 /// All the data related to authentication and authorization.
252 pub(crate) auth_ctx: Arc<AuthCtx>,
253
254 /// The URL of the server.
255 ///
256 /// Not to be confused with the `Self::homeserver`. `server` is usually
257 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
258 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
259 /// homeserver (at the time of writing — 2024-08-28).
260 ///
261 /// This value is optional depending on how the `Client` has been built.
262 /// If it's been built from a homeserver URL directly, we don't know the
263 /// server. However, if the `Client` has been built from a server URL or
264 /// name, then the homeserver has been discovered, and we know both.
265 server: Option<Url>,
266
267 /// The URL of the homeserver to connect to.
268 ///
269 /// This is the URL for the client-server Matrix API.
270 homeserver: StdRwLock<Url>,
271
272 /// The sliding sync version.
273 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
274
275 /// The underlying HTTP client.
276 pub(crate) http_client: HttpClient,
277
278 /// User session data.
279 pub(super) base_client: BaseClient,
280
281 /// Collection of in-memory caches for the [`Client`].
282 pub(crate) caches: ClientCaches,
283
284 /// Collection of locks individual client methods might want to use, either
285 /// to ensure that only a single call to a method happens at once or to
286 /// deduplicate multiple calls to a method.
287 pub(crate) locks: ClientLocks,
288
289 /// The cross-process lock configuration.
290 ///
291 /// The SDK provides cross-process store locks (see
292 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
293 /// [`CrossProcessLockConfig::MultiProcess`] is used.
294 ///
295 /// If multiple `Client`s are running in different processes, this
296 /// value MUST be different for each `Client`.
297 cross_process_lock_config: CrossProcessLockConfig,
298
299 /// A mapping of the times at which the current user sent typing notices,
300 /// keyed by room.
301 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
302
303 /// Event handlers. See `add_event_handler`.
304 pub(crate) event_handlers: EventHandlerStore,
305
306 /// Notification handlers. See `register_notification_handler`.
307 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
308
309 /// The sender-side of channels used to receive room updates.
310 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
311
312 /// The sender-side of a channel used to observe all the room updates of a
313 /// sync response.
314 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
315
316 /// Whether the client should update its homeserver URL with the discovery
317 /// information present in the login response.
318 respect_login_well_known: bool,
319
320 /// An event that can be listened on to wait for a successful sync. The
321 /// event will only be fired if a sync loop is running. Can be used for
322 /// synchronization, e.g. if we send out a request to create a room, we can
323 /// wait for the sync to get the data to fetch a room object from the state
324 /// store.
325 pub(crate) sync_beat: event_listener::Event,
326
327 /// A central cache for events, inactive first.
328 ///
329 /// It becomes active when [`EventCache::subscribe`] is called.
330 pub(crate) event_cache: OnceCell<EventCache>,
331
332 /// End-to-end encryption related state.
333 #[cfg(feature = "e2e-encryption")]
334 pub(crate) e2ee: EncryptionData,
335
336 /// The verification state of our own device.
337 #[cfg(feature = "e2e-encryption")]
338 pub(crate) verification_state: SharedObservable<VerificationState>,
339
340 /// Whether to enable the experimental support for sending and receiving
341 /// encrypted room history on invite, per [MSC4268].
342 ///
343 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
344 #[cfg(feature = "e2e-encryption")]
345 pub(crate) enable_share_history_on_invite: bool,
346
347 /// Data related to the [`SendQueue`].
348 ///
349 /// [`SendQueue`]: crate::send_queue::SendQueue
350 pub(crate) send_queue_data: Arc<SendQueueData>,
351
352 /// The `max_upload_size` value of the homeserver, it contains the max
353 /// request size you can send.
354 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
355
356 /// The entry point to get the [`LatestEvent`] of rooms and threads.
357 ///
358 /// [`LatestEvent`]: crate::latest_event::LatestEvent
359 latest_events: OnceCell<LatestEvents>,
360
361 /// Service handling the catching up of thread subscriptions in the
362 /// background.
363 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
364
365 #[cfg(feature = "experimental-search")]
366 /// Handler for [`RoomIndex`]'s of each room
367 search_index: SearchIndex,
368
369 /// A monitor for background tasks spawned by the client.
370 pub(crate) task_monitor: TaskMonitor,
371
372 /// A sender to notify subscribers about duplicate key upload errors
373 /// triggered by requests to /keys/upload.
374 #[cfg(feature = "e2e-encryption")]
375 pub(crate) duplicate_key_upload_error_sender:
376 broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
377}
378
379impl ClientInner {
380 /// Create a new `ClientInner`.
381 ///
382 /// All the fields passed as parameters here are those that must be cloned
383 /// upon instantiation of a sub-client, e.g. a client specialized for
384 /// notifications.
385 #[allow(clippy::too_many_arguments)]
386 async fn new(
387 auth_ctx: Arc<AuthCtx>,
388 server: Option<Url>,
389 homeserver: Url,
390 sliding_sync_version: SlidingSyncVersion,
391 http_client: HttpClient,
392 base_client: BaseClient,
393 supported_versions: CachedValue<SupportedVersions>,
394 well_known: CachedValue<Option<WellKnownResponse>>,
395 respect_login_well_known: bool,
396 event_cache: OnceCell<EventCache>,
397 send_queue: Arc<SendQueueData>,
398 latest_events: OnceCell<LatestEvents>,
399 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
400 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
401 cross_process_lock_config: CrossProcessLockConfig,
402 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
403 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
404 ) -> Arc<Self> {
405 let caches = ClientCaches {
406 supported_versions: supported_versions.into(),
407 well_known: well_known.into(),
408 server_metadata: Mutex::new(TtlCache::new()),
409 };
410
411 let client = Self {
412 server,
413 homeserver: StdRwLock::new(homeserver),
414 auth_ctx,
415 sliding_sync_version: StdRwLock::new(sliding_sync_version),
416 http_client,
417 base_client,
418 caches,
419 locks: Default::default(),
420 cross_process_lock_config,
421 typing_notice_times: Default::default(),
422 event_handlers: Default::default(),
423 notification_handlers: Default::default(),
424 room_update_channels: Default::default(),
425 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
426 // ballast for all observers to catch up.
427 room_updates_sender: broadcast::Sender::new(32),
428 respect_login_well_known,
429 sync_beat: event_listener::Event::new(),
430 event_cache,
431 send_queue_data: send_queue,
432 latest_events,
433 #[cfg(feature = "e2e-encryption")]
434 e2ee: EncryptionData::new(encryption_settings),
435 #[cfg(feature = "e2e-encryption")]
436 verification_state: SharedObservable::new(VerificationState::Unknown),
437 #[cfg(feature = "e2e-encryption")]
438 enable_share_history_on_invite,
439 server_max_upload_size: Mutex::new(OnceCell::new()),
440 #[cfg(feature = "experimental-search")]
441 search_index: search_index_handler,
442 thread_subscription_catchup,
443 task_monitor: TaskMonitor::new(),
444 #[cfg(feature = "e2e-encryption")]
445 duplicate_key_upload_error_sender: broadcast::channel(1).0,
446 };
447
448 #[allow(clippy::let_and_return)]
449 let client = Arc::new(client);
450
451 #[cfg(feature = "e2e-encryption")]
452 client.e2ee.initialize_tasks(&client);
453
454 let _ = client
455 .event_cache
456 .get_or_init(|| async {
457 EventCache::new(&client, client.base_client.event_cache_store().clone())
458 })
459 .await;
460
461 let _ = client
462 .thread_subscription_catchup
463 .get_or_init(|| async {
464 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
465 })
466 .await;
467
468 client
469 }
470}
471
472#[cfg(not(tarpaulin_include))]
473impl Debug for Client {
474 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
475 write!(fmt, "Client")
476 }
477}
478
479impl Client {
480 /// Create a new [`Client`] that will use the given homeserver.
481 ///
482 /// # Arguments
483 ///
484 /// * `homeserver_url` - The homeserver that the client should connect to.
485 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
486 Self::builder().homeserver_url(homeserver_url).build().await
487 }
488
489 /// Returns a subscriber that publishes an event every time the ignore user
490 /// list changes.
491 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
492 self.inner.base_client.subscribe_to_ignore_user_list_changes()
493 }
494
495 /// Create a new [`ClientBuilder`].
496 pub fn builder() -> ClientBuilder {
497 ClientBuilder::new()
498 }
499
500 pub(crate) fn base_client(&self) -> &BaseClient {
501 &self.inner.base_client
502 }
503
504 /// The underlying HTTP client.
505 pub fn http_client(&self) -> &reqwest::Client {
506 &self.inner.http_client.inner
507 }
508
509 pub(crate) fn locks(&self) -> &ClientLocks {
510 &self.inner.locks
511 }
512
513 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
514 &self.inner.auth_ctx
515 }
516
517 /// The cross-process store lock configuration used by this [`Client`].
518 ///
519 /// The SDK provides cross-process store locks (see
520 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
521 /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
522 /// the value used for all cross-process store locks used by this
523 /// `Client`.
524 pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
525 &self.inner.cross_process_lock_config
526 }
527
528 /// Change the homeserver URL used by this client.
529 ///
530 /// # Arguments
531 ///
532 /// * `homeserver_url` - The new URL to use.
533 fn set_homeserver(&self, homeserver_url: Url) {
534 *self.inner.homeserver.write().unwrap() = homeserver_url;
535 }
536
537 /// Change to a different homeserver and re-resolve well-known.
538 #[cfg(feature = "e2e-encryption")]
539 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
540 &self,
541 homeserver_url: Url,
542 ) -> Result<()> {
543 self.set_homeserver(homeserver_url);
544 self.reset_well_known().await?;
545 if let Some(well_known) = self.load_or_fetch_well_known().await? {
546 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
547 }
548 Ok(())
549 }
550
551 /// Retrieves a helper component to access the [`HomeserverCapabilities`]
552 /// supported or disabled by the homeserver.
553 pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
554 HomeserverCapabilities::new(self.clone())
555 }
556
557 /// Get the server vendor information from the federation API.
558 ///
559 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
560 /// both the server's software name and version.
561 ///
562 /// # Examples
563 ///
564 /// ```no_run
565 /// # use matrix_sdk::Client;
566 /// # use url::Url;
567 /// # async {
568 /// # let homeserver = Url::parse("http://example.com")?;
569 /// let client = Client::new(homeserver).await?;
570 ///
571 /// let server_info = client.server_vendor_info(None).await?;
572 /// println!(
573 /// "Server: {}, Version: {}",
574 /// server_info.server_name, server_info.version
575 /// );
576 /// # anyhow::Ok(()) };
577 /// ```
578 #[cfg(feature = "federation-api")]
579 pub async fn server_vendor_info(
580 &self,
581 request_config: Option<RequestConfig>,
582 ) -> HttpResult<ServerVendorInfo> {
583 use ruma::api::federation::discovery::get_server_version;
584
585 let res = self
586 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
587 .await?;
588
589 // Extract server info, using defaults if fields are missing.
590 let server = res.server.unwrap_or_default();
591 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
592 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
593
594 Ok(ServerVendorInfo { server_name: server_name_str, version })
595 }
596
597 /// Get a copy of the default request config.
598 ///
599 /// The default request config is what's used when sending requests if no
600 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
601 /// function with such a parameter.
602 ///
603 /// If the default request config was not customized through
604 /// [`ClientBuilder`] when creating this `Client`, the returned value will
605 /// be equivalent to [`RequestConfig::default()`].
606 pub fn request_config(&self) -> RequestConfig {
607 self.inner.http_client.request_config
608 }
609
610 /// Check whether the client has been activated.
611 ///
612 /// A client is considered active when:
613 ///
614 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
615 /// is logged in,
616 /// 2. Has loaded cached data from storage,
617 /// 3. If encryption is enabled, it also initialized or restored its
618 /// `OlmMachine`.
619 pub fn is_active(&self) -> bool {
620 self.inner.base_client.is_active()
621 }
622
623 /// The server used by the client.
624 ///
625 /// See `Self::server` to learn more.
626 pub fn server(&self) -> Option<&Url> {
627 self.inner.server.as_ref()
628 }
629
630 /// The homeserver of the client.
631 pub fn homeserver(&self) -> Url {
632 self.inner.homeserver.read().unwrap().clone()
633 }
634
635 /// Get the sliding sync version.
636 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
637 self.inner.sliding_sync_version.read().unwrap().clone()
638 }
639
640 /// Override the sliding sync version.
641 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
642 let mut lock = self.inner.sliding_sync_version.write().unwrap();
643 *lock = version;
644 }
645
646 /// Get the Matrix user session meta information.
647 ///
648 /// If the client is currently logged in, this will return a
649 /// [`SessionMeta`] object which contains the user ID and device ID.
650 /// Otherwise it returns `None`.
651 pub fn session_meta(&self) -> Option<&SessionMeta> {
652 self.base_client().session_meta()
653 }
654
655 /// Returns a receiver that gets events for each room info update. To watch
656 /// for new events, use `receiver.resubscribe()`.
657 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
658 self.base_client().room_info_notable_update_receiver()
659 }
660
661 /// Performs a search for users.
662 /// The search is performed case-insensitively on user IDs and display names
663 ///
664 /// # Arguments
665 ///
666 /// * `search_term` - The search term for the search
667 /// * `limit` - The maximum number of results to return. Defaults to 10.
668 ///
669 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
670 pub async fn search_users(
671 &self,
672 search_term: &str,
673 limit: u64,
674 ) -> HttpResult<search_users::v3::Response> {
675 let mut request = search_users::v3::Request::new(search_term.to_owned());
676
677 if let Some(limit) = UInt::new(limit) {
678 request.limit = limit;
679 }
680
681 self.send(request).await
682 }
683
684 /// Get the user id of the current owner of the client.
685 pub fn user_id(&self) -> Option<&UserId> {
686 self.session_meta().map(|s| s.user_id.as_ref())
687 }
688
689 /// Get the device ID that identifies the current session.
690 pub fn device_id(&self) -> Option<&DeviceId> {
691 self.session_meta().map(|s| s.device_id.as_ref())
692 }
693
694 /// Get the current access token for this session.
695 ///
696 /// Will be `None` if the client has not been logged in.
697 pub fn access_token(&self) -> Option<String> {
698 self.auth_ctx().access_token()
699 }
700
701 /// Get the current tokens for this session.
702 ///
703 /// To be notified of changes in the session tokens, use
704 /// [`Client::subscribe_to_session_changes()`] or
705 /// [`Client::set_session_callbacks()`].
706 ///
707 /// Returns `None` if the client has not been logged in.
708 pub fn session_tokens(&self) -> Option<SessionTokens> {
709 self.auth_ctx().session_tokens()
710 }
711
712 /// Access the authentication API used to log in this client.
713 ///
714 /// Will be `None` if the client has not been logged in.
715 pub fn auth_api(&self) -> Option<AuthApi> {
716 match self.auth_ctx().auth_data.get()? {
717 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
718 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
719 }
720 }
721
722 /// Get the whole session info of this client.
723 ///
724 /// Will be `None` if the client has not been logged in.
725 ///
726 /// Can be used with [`Client::restore_session`] to restore a previously
727 /// logged-in session.
728 pub fn session(&self) -> Option<AuthSession> {
729 match self.auth_api()? {
730 AuthApi::Matrix(api) => api.session().map(Into::into),
731 AuthApi::OAuth(api) => api.full_session().map(Into::into),
732 }
733 }
734
735 /// Get a reference to the state store.
736 pub fn state_store(&self) -> &DynStateStore {
737 self.base_client().state_store()
738 }
739
740 /// Get a reference to the event cache store.
741 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
742 self.base_client().event_cache_store()
743 }
744
745 /// Get a reference to the media store.
746 pub fn media_store(&self) -> &MediaStoreLock {
747 self.base_client().media_store()
748 }
749
750 /// Access the native Matrix authentication API with this client.
751 pub fn matrix_auth(&self) -> MatrixAuth {
752 MatrixAuth::new(self.clone())
753 }
754
755 /// Get the account of the current owner of the client.
756 pub fn account(&self) -> Account {
757 Account::new(self.clone())
758 }
759
760 /// Get the encryption manager of the client.
761 #[cfg(feature = "e2e-encryption")]
762 pub fn encryption(&self) -> Encryption {
763 Encryption::new(self.clone())
764 }
765
766 /// Get the media manager of the client.
767 pub fn media(&self) -> Media {
768 Media::new(self.clone())
769 }
770
771 /// Get the pusher manager of the client.
772 pub fn pusher(&self) -> Pusher {
773 Pusher::new(self.clone())
774 }
775
776 /// Access the OAuth 2.0 API of the client.
777 pub fn oauth(&self) -> OAuth {
778 OAuth::new(self.clone())
779 }
780
781 /// Register a handler for a specific event type.
782 ///
783 /// The handler is a function or closure with one or more arguments. The
784 /// first argument is the event itself. All additional arguments are
785 /// "context" arguments: They have to implement [`EventHandlerContext`].
786 /// This trait is named that way because most of the types implementing it
787 /// give additional context about an event: The room it was in, its raw form
788 /// and other similar things. As two exceptions to this,
789 /// [`Client`] and [`EventHandlerHandle`] also implement the
790 /// `EventHandlerContext` trait so you don't have to clone your client
791 /// into the event handler manually and a handler can decide to remove
792 /// itself.
793 ///
794 /// Some context arguments are not universally applicable. A context
795 /// argument that isn't available for the given event type will result in
796 /// the event handler being skipped and an error being logged. The following
797 /// context argument types are only available for a subset of event types:
798 ///
799 /// * [`Room`] is only available for room-specific events, i.e. not for
800 /// events like global account data events or presence events.
801 ///
802 /// You can provide custom context via
803 /// [`add_event_handler_context`](Client::add_event_handler_context) and
804 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
805 /// into the event handler.
806 ///
807 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
808 ///
809 /// # Examples
810 ///
811 /// ```no_run
812 /// use matrix_sdk::{
813 /// deserialized_responses::EncryptionInfo,
814 /// event_handler::Ctx,
815 /// ruma::{
816 /// events::{
817 /// macros::EventContent,
818 /// push_rules::PushRulesEvent,
819 /// room::{
820 /// message::SyncRoomMessageEvent,
821 /// topic::SyncRoomTopicEvent,
822 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
823 /// },
824 /// },
825 /// push::Action,
826 /// Int, MilliSecondsSinceUnixEpoch,
827 /// },
828 /// Client, Room,
829 /// };
830 /// use serde::{Deserialize, Serialize};
831 ///
832 /// # async fn example(client: Client) {
833 /// client.add_event_handler(
834 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
835 /// // Common usage: Room event plus room and client.
836 /// },
837 /// );
838 /// client.add_event_handler(
839 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
840 /// async move {
841 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
842 /// // unencrypted events and events that were decrypted by the SDK.
843 /// }
844 /// },
845 /// );
846 /// client.add_event_handler(
847 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
848 /// async move {
849 /// // A `Vec<Action>` parameter allows you to know which push actions
850 /// // are applicable for an event. For example, an event with
851 /// // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
852 /// // should be highlighted in the timeline.
853 /// }
854 /// },
855 /// );
856 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
857 /// // You can omit any or all arguments after the first.
858 /// });
859 ///
860 /// // Registering a temporary event handler:
861 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
862 /// /* Event handler */
863 /// });
864 /// client.remove_event_handler(handle);
865 ///
866 /// // Registering custom event handler context:
867 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
868 /// struct MyContext {
869 /// number: usize,
870 /// }
871 /// client.add_event_handler_context(MyContext { number: 5 });
872 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
873 /// // Use the context
874 /// });
875 ///
876 /// // This will handle membership events in joined rooms. Invites are special, see below.
877 /// client.add_event_handler(
878 /// |ev: SyncRoomMemberEvent| async move {},
879 /// );
880 ///
881 /// // To handle state events in invited rooms (including invite membership events),
882 /// // `StrippedRoomMemberEvent` should be used.
883 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
884 /// client.add_event_handler(
885 /// |ev: StrippedRoomMemberEvent| async move {},
886 /// );
887 ///
888 /// // Custom events work exactly the same way, you just need to declare
889 /// // the content struct and use the EventContent derive macro on it.
890 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
891 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
892 /// struct TokenEventContent {
893 /// token: String,
894 /// #[serde(rename = "exp")]
895 /// expires_at: MilliSecondsSinceUnixEpoch,
896 /// }
897 ///
898 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
899 /// todo!("Display the token");
900 /// });
901 ///
902 /// // Event handler closures can also capture local variables.
903 /// // Make sure they are cheap to clone though, because they will be cloned
904 /// // every time the closure is called.
905 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
906 ///
907 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
908 /// println!("Calling the handler with identifier {data}");
909 /// });
910 /// # }
911 /// ```
912 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
913 where
914 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
915 H: EventHandler<Ev, Ctx>,
916 {
917 self.add_event_handler_impl(handler, None)
918 }
919
920 /// Register a handler for a specific room, and event type.
921 ///
922 /// This method works the same way as
923 /// [`add_event_handler`][Self::add_event_handler], except that the handler
924 /// will only be called for events in the room with the specified ID. See
925 /// that method for more details on event handler functions.
926 ///
927 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
928 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
929 /// your use case.
930 pub fn add_room_event_handler<Ev, Ctx, H>(
931 &self,
932 room_id: &RoomId,
933 handler: H,
934 ) -> EventHandlerHandle
935 where
936 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
937 H: EventHandler<Ev, Ctx>,
938 {
939 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
940 }
941
942 /// Observe a specific event type.
943 ///
944 /// `Ev` represents the kind of event that will be observed. `Ctx`
945 /// represents the context that will come with the event. It relies on the
946 /// same mechanism as [`Client::add_event_handler`]. The main difference is
947 /// that it returns an [`ObservableEventHandler`] and doesn't require a
948 /// user-defined closure. It is possible to subscribe to the
949 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
950 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
951 /// Ctx)`.
952 ///
953 /// Be careful that only the most recent value can be observed. Subscribers
954 /// are notified when a new value is sent, but there is no guarantee
955 /// that they will see all values.
956 ///
957 /// # Example
958 ///
959 /// Let's see a classical usage:
960 ///
961 /// ```
962 /// use futures_util::StreamExt as _;
963 /// use matrix_sdk::{
964 /// Client, Room,
965 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
966 /// };
967 ///
968 /// # async fn example(client: Client) -> Option<()> {
969 /// let observer =
970 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
971 ///
972 /// let mut subscriber = observer.subscribe();
973 ///
974 /// let (event, (room, push_actions)) = subscriber.next().await?;
975 /// # Some(())
976 /// # }
977 /// ```
978 ///
979 /// Now let's see how to get several contexts that can be useful for you:
980 ///
981 /// ```
982 /// use matrix_sdk::{
983 /// Client, Room,
984 /// deserialized_responses::EncryptionInfo,
985 /// ruma::{
986 /// events::room::{
987 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
988 /// },
989 /// push::Action,
990 /// },
991 /// };
992 ///
993 /// # async fn example(client: Client) {
994 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
995 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
996 ///
997 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
998 /// // to distinguish between unencrypted events and events that were decrypted
999 /// // by the SDK.
1000 /// let _ = client
1001 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1002 /// );
1003 ///
1004 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1005 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1006 /// // should be highlighted in the timeline.
1007 /// let _ =
1008 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1009 ///
1010 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1011 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1012 /// # }
1013 /// ```
1014 ///
1015 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1016 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1017 where
1018 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1019 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1020 {
1021 self.observe_room_events_impl(None)
1022 }
1023
1024 /// Observe a specific room, and event type.
1025 ///
1026 /// This method works the same way as [`Client::observe_events`], except
1027 /// that the observability will only be applied for events in the room with
1028 /// the specified ID. See that method for more details.
1029 ///
1030 /// Be careful that only the most recent value can be observed. Subscribers
1031 /// are notified when a new value is sent, but there is no guarantee
1032 /// that they will see all values.
1033 pub fn observe_room_events<Ev, Ctx>(
1034 &self,
1035 room_id: &RoomId,
1036 ) -> ObservableEventHandler<(Ev, Ctx)>
1037 where
1038 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1039 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1040 {
1041 self.observe_room_events_impl(Some(room_id.to_owned()))
1042 }
1043
1044 /// Shared implementation for `Client::observe_events` and
1045 /// `Client::observe_room_events`.
1046 fn observe_room_events_impl<Ev, Ctx>(
1047 &self,
1048 room_id: Option<OwnedRoomId>,
1049 ) -> ObservableEventHandler<(Ev, Ctx)>
1050 where
1051 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1052 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1053 {
1054 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1055 // new value.
1056 let shared_observable = SharedObservable::new(None);
1057
1058 ObservableEventHandler::new(
1059 shared_observable.clone(),
1060 self.event_handler_drop_guard(self.add_event_handler_impl(
1061 move |event: Ev, context: Ctx| {
1062 shared_observable.set(Some((event, context)));
1063
1064 ready(())
1065 },
1066 room_id,
1067 )),
1068 )
1069 }
1070
1071 /// Remove the event handler associated with the handle.
1072 ///
1073 /// Note that you **must not** call `remove_event_handler` from the
1074 /// non-async part of an event handler, that is:
1075 ///
1076 /// ```ignore
1077 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1078 /// // ⚠ this will cause a deadlock ⚠
1079 /// client.remove_event_handler(handle);
1080 ///
1081 /// async move {
1082 /// // removing the event handler here is fine
1083 /// client.remove_event_handler(handle);
1084 /// }
1085 /// })
1086 /// ```
1087 ///
1088 /// Note also that handlers that remove themselves will still execute with
1089 /// events received in the same sync cycle.
1090 ///
1091 /// # Arguments
1092 ///
1093 /// `handle` - The [`EventHandlerHandle`] that is returned when
1094 /// registering the event handler with [`Client::add_event_handler`].
1095 ///
1096 /// # Examples
1097 ///
1098 /// ```no_run
1099 /// # use url::Url;
1100 /// # use tokio::sync::mpsc;
1101 /// #
1102 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1103 /// #
1104 /// use matrix_sdk::{
1105 /// Client, event_handler::EventHandlerHandle,
1106 /// ruma::events::room::member::SyncRoomMemberEvent,
1107 /// };
1108 /// #
1109 /// # futures_executor::block_on(async {
1110 /// # let client = matrix_sdk::Client::builder()
1111 /// # .homeserver_url(homeserver)
1112 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1113 /// # .build()
1114 /// # .await
1115 /// # .unwrap();
1116 ///
1117 /// client.add_event_handler(
1118 /// |ev: SyncRoomMemberEvent,
1119 /// client: Client,
1120 /// handle: EventHandlerHandle| async move {
1121 /// // Common usage: Check arriving Event is the expected one
1122 /// println!("Expected RoomMemberEvent received!");
1123 /// client.remove_event_handler(handle);
1124 /// },
1125 /// );
1126 /// # });
1127 /// ```
1128 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1129 self.inner.event_handlers.remove(handle);
1130 }
1131
1132 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1133 /// the given handle.
1134 ///
1135 /// When the returned value is dropped, the event handler will be removed.
1136 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1137 EventHandlerDropGuard::new(handle, self.clone())
1138 }
1139
1140 /// Add an arbitrary value for use as event handler context.
1141 ///
1142 /// The value can be obtained in an event handler by adding an argument of
1143 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1144 ///
1145 /// If a value of the same type has been added before, it will be
1146 /// overwritten.
1147 ///
1148 /// # Examples
1149 ///
1150 /// ```no_run
1151 /// use matrix_sdk::{
1152 /// Room, event_handler::Ctx,
1153 /// ruma::events::room::message::SyncRoomMessageEvent,
1154 /// };
1155 /// # #[derive(Clone)]
1156 /// # struct SomeType;
1157 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1158 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1159 /// # futures_executor::block_on(async {
1160 /// # let client = matrix_sdk::Client::builder()
1161 /// # .homeserver_url(homeserver)
1162 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1163 /// # .build()
1164 /// # .await
1165 /// # .unwrap();
1166 ///
1167 /// // Handle used to send messages to the UI part of the app
1168 /// let my_gui_handle: SomeType = obtain_gui_handle();
1169 ///
1170 /// client.add_event_handler_context(my_gui_handle.clone());
1171 /// client.add_event_handler(
1172 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1173 /// async move {
1174 /// // gui_handle.send(DisplayMessage { message: ev });
1175 /// }
1176 /// },
1177 /// );
1178 /// # });
1179 /// ```
1180 pub fn add_event_handler_context<T>(&self, ctx: T)
1181 where
1182 T: Clone + Send + Sync + 'static,
1183 {
1184 self.inner.event_handlers.add_context(ctx);
1185 }
1186
1187 /// Register a handler for a notification.
1188 ///
1189 /// Similar to [`Client::add_event_handler`], but only allows functions
1190 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1191 /// [`Client`] for now.
1192 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1193 where
1194 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1195 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1196 {
1197 self.inner.notification_handlers.write().await.push(Box::new(
1198 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1199 ));
1200
1201 self
1202 }
1203
1204 /// Subscribe to all updates for the room with the given ID.
1205 ///
1206 /// The returned receiver will receive a new message for each sync response
1207 /// that contains updates for that room.
1208 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1209 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1210 btree_map::Entry::Vacant(entry) => {
1211 let (tx, rx) = broadcast::channel(8);
1212 entry.insert(tx);
1213 rx
1214 }
1215 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1216 }
1217 }
1218
1219 /// Subscribe to all updates to all rooms, whenever any has been received in
1220 /// a sync response.
1221 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1222 self.inner.room_updates_sender.subscribe()
1223 }
1224
1225 pub(crate) async fn notification_handlers(
1226 &self,
1227 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1228 self.inner.notification_handlers.read().await
1229 }
1230
1231 /// Get all the rooms the client knows about.
1232 ///
1233 /// This will return the list of joined, invited, and left rooms.
1234 pub fn rooms(&self) -> Vec<Room> {
1235 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1236 }
1237
1238 /// Get all the rooms the client knows about, filtered by room state.
1239 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1240 self.base_client()
1241 .rooms_filtered(filter)
1242 .into_iter()
1243 .map(|room| Room::new(self.clone(), room))
1244 .collect()
1245 }
1246
1247 /// Get a stream of all the rooms, in addition to the existing rooms.
1248 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1249 let (rooms, stream) = self.base_client().rooms_stream();
1250
1251 let map_room = |room| Room::new(self.clone(), room);
1252
1253 (
1254 rooms.into_iter().map(map_room).collect(),
1255 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1256 )
1257 }
1258
1259 /// Returns the joined rooms this client knows about.
1260 pub fn joined_rooms(&self) -> Vec<Room> {
1261 self.rooms_filtered(RoomStateFilter::JOINED)
1262 }
1263
1264 /// Returns the invited rooms this client knows about.
1265 pub fn invited_rooms(&self) -> Vec<Room> {
1266 self.rooms_filtered(RoomStateFilter::INVITED)
1267 }
1268
1269 /// Returns the left rooms this client knows about.
1270 pub fn left_rooms(&self) -> Vec<Room> {
1271 self.rooms_filtered(RoomStateFilter::LEFT)
1272 }
1273
1274 /// Returns the joined space rooms this client knows about.
1275 pub fn joined_space_rooms(&self) -> Vec<Room> {
1276 self.base_client()
1277 .rooms_filtered(RoomStateFilter::JOINED)
1278 .into_iter()
1279 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1280 .collect()
1281 }
1282
1283 /// Get a room with the given room id.
1284 ///
1285 /// # Arguments
1286 ///
1287 /// `room_id` - The unique id of the room that should be fetched.
1288 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1289 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1290 }
1291
1292 /// Gets the preview of a room, whether the current user has joined it or
1293 /// not.
1294 pub async fn get_room_preview(
1295 &self,
1296 room_or_alias_id: &RoomOrAliasId,
1297 via: Vec<OwnedServerName>,
1298 ) -> Result<RoomPreview> {
1299 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1300 Ok(room_id) => room_id.to_owned(),
1301 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1302 };
1303
1304 if let Some(room) = self.get_room(&room_id) {
1305 // The cached data can only be trusted if the room state is joined or
1306 // banned: for invite and knock rooms, no updates will be received
1307 // for the rooms after the invite/knock action took place so we may
1308 // have very out to date data for important fields such as
1309 // `join_rule`. For left rooms, the homeserver should return the latest info.
1310 match room.state() {
1311 RoomState::Joined | RoomState::Banned => {
1312 return Ok(RoomPreview::from_known_room(&room).await);
1313 }
1314 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1315 }
1316 }
1317
1318 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1319 }
1320
1321 /// Resolve a room alias to a room id and a list of servers which know
1322 /// about it.
1323 ///
1324 /// # Arguments
1325 ///
1326 /// `room_alias` - The room alias to be resolved.
1327 pub async fn resolve_room_alias(
1328 &self,
1329 room_alias: &RoomAliasId,
1330 ) -> HttpResult<get_alias::v3::Response> {
1331 let request = get_alias::v3::Request::new(room_alias.to_owned());
1332 self.send(request).await
1333 }
1334
1335 /// Checks if a room alias is not in use yet.
1336 ///
1337 /// Returns:
1338 /// - `Ok(true)` if the room alias is available.
1339 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1340 /// status code).
1341 /// - An `Err` otherwise.
1342 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1343 match self.resolve_room_alias(alias).await {
1344 // The room alias was resolved, so it's already in use.
1345 Ok(_) => Ok(false),
1346 Err(error) => {
1347 match error.client_api_error_kind() {
1348 // The room alias wasn't found, so it's available.
1349 Some(ErrorKind::NotFound) => Ok(true),
1350 _ => Err(error),
1351 }
1352 }
1353 }
1354 }
1355
1356 /// Adds a new room alias associated with a room to the room directory.
1357 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1358 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1359 self.send(request).await?;
1360 Ok(())
1361 }
1362
1363 /// Removes a room alias from the room directory.
1364 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1365 let request = delete_alias::v3::Request::new(alias.to_owned());
1366 self.send(request).await?;
1367 Ok(())
1368 }
1369
1370 /// Update the homeserver from the login response well-known if needed.
1371 ///
1372 /// # Arguments
1373 ///
1374 /// * `login_well_known` - The `well_known` field from a successful login
1375 /// response.
1376 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1377 if self.inner.respect_login_well_known
1378 && let Some(well_known) = login_well_known
1379 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1380 {
1381 self.set_homeserver(homeserver);
1382 }
1383 }
1384
1385 /// Similar to [`Client::restore_session_with`], with
1386 /// [`RoomLoadSettings::default()`].
1387 ///
1388 /// # Panics
1389 ///
1390 /// Panics if a session was already restored or logged in.
1391 #[instrument(skip_all)]
1392 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1393 self.restore_session_with(session, RoomLoadSettings::default()).await
1394 }
1395
1396 /// Restore a session previously logged-in using one of the available
1397 /// authentication APIs. The number of rooms to restore is controlled by
1398 /// [`RoomLoadSettings`].
1399 ///
1400 /// See the documentation of the corresponding authentication API's
1401 /// `restore_session` method for more information.
1402 ///
1403 /// # Panics
1404 ///
1405 /// Panics if a session was already restored or logged in.
1406 #[instrument(skip_all)]
1407 pub async fn restore_session_with(
1408 &self,
1409 session: impl Into<AuthSession>,
1410 room_load_settings: RoomLoadSettings,
1411 ) -> Result<()> {
1412 let session = session.into();
1413 match session {
1414 AuthSession::Matrix(session) => {
1415 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1416 }
1417 AuthSession::OAuth(session) => {
1418 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1419 }
1420 }
1421 }
1422
1423 /// Refresh the access token using the authentication API used to log into
1424 /// this session.
1425 ///
1426 /// See the documentation of the authentication API's `refresh_access_token`
1427 /// method for more information.
1428 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1429 let Some(auth_api) = self.auth_api() else {
1430 return Err(RefreshTokenError::RefreshTokenRequired);
1431 };
1432
1433 match auth_api {
1434 AuthApi::Matrix(api) => {
1435 trace!("Token refresh: Using the homeserver.");
1436 Box::pin(api.refresh_access_token()).await?;
1437 }
1438 AuthApi::OAuth(api) => {
1439 trace!("Token refresh: Using OAuth 2.0.");
1440 Box::pin(api.refresh_access_token()).await?;
1441 }
1442 }
1443
1444 Ok(())
1445 }
1446
1447 /// Log out the current session using the proper authentication API.
1448 ///
1449 /// # Errors
1450 ///
1451 /// Returns an error if the session is not authenticated or if an error
1452 /// occurred while making the request to the server.
1453 pub async fn logout(&self) -> Result<(), Error> {
1454 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1455 match auth_api {
1456 AuthApi::Matrix(matrix_auth) => {
1457 matrix_auth.logout().await?;
1458 Ok(())
1459 }
1460 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1461 }
1462 }
1463
1464 /// Get or upload a sync filter.
1465 ///
1466 /// This method will either get a filter ID from the store or upload the
1467 /// filter definition to the homeserver and return the new filter ID.
1468 ///
1469 /// # Arguments
1470 ///
1471 /// * `filter_name` - The unique name of the filter, this name will be used
1472 /// locally to store and identify the filter ID returned by the server.
1473 ///
1474 /// * `definition` - The filter definition that should be uploaded to the
1475 /// server if no filter ID can be found in the store.
1476 ///
1477 /// # Examples
1478 ///
1479 /// ```no_run
1480 /// # use matrix_sdk::{
1481 /// # Client, config::SyncSettings,
1482 /// # ruma::api::client::{
1483 /// # filter::{
1484 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1485 /// # },
1486 /// # sync::sync_events::v3::Filter,
1487 /// # }
1488 /// # };
1489 /// # use url::Url;
1490 /// # async {
1491 /// # let homeserver = Url::parse("http://example.com").unwrap();
1492 /// # let client = Client::new(homeserver).await.unwrap();
1493 /// let mut filter = FilterDefinition::default();
1494 ///
1495 /// // Let's enable member lazy loading.
1496 /// filter.room.state.lazy_load_options =
1497 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1498 ///
1499 /// let filter_id = client
1500 /// .get_or_upload_filter("sync", filter)
1501 /// .await
1502 /// .unwrap();
1503 ///
1504 /// let sync_settings = SyncSettings::new()
1505 /// .filter(Filter::FilterId(filter_id));
1506 ///
1507 /// let response = client.sync_once(sync_settings).await.unwrap();
1508 /// # };
1509 #[instrument(skip(self, definition))]
1510 pub async fn get_or_upload_filter(
1511 &self,
1512 filter_name: &str,
1513 definition: FilterDefinition,
1514 ) -> Result<String> {
1515 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1516 debug!("Found filter locally");
1517 Ok(filter)
1518 } else {
1519 debug!("Didn't find filter locally");
1520 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1521 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1522 let response = self.send(request).await?;
1523
1524 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1525
1526 Ok(response.filter_id)
1527 }
1528 }
1529
1530 /// Prepare to join a room by ID, by getting the current details about it
1531 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1532 let room = self.get_room(room_id)?;
1533
1534 let inviter = match room.invite_details().await {
1535 Ok(details) => details.inviter,
1536 Err(Error::WrongRoomState(_)) => None,
1537 Err(e) => {
1538 warn!("Error fetching invite details for room: {e:?}");
1539 None
1540 }
1541 };
1542
1543 Some(PreJoinRoomInfo { inviter })
1544 }
1545
1546 /// Finish joining a room.
1547 ///
1548 /// If the room was an invite that should be marked as a DM, will include it
1549 /// in the DM event after creating the joined room.
1550 ///
1551 /// If encrypted history sharing is enabled, will check to see if we have a
1552 /// key bundle, and import it if so.
1553 ///
1554 /// # Arguments
1555 ///
1556 /// * `room_id` - The `RoomId` of the room that was joined.
1557 /// * `pre_join_room_info` - Information about the room before we joined.
1558 async fn finish_join_room(
1559 &self,
1560 room_id: &RoomId,
1561 pre_join_room_info: Option<PreJoinRoomInfo>,
1562 ) -> Result<Room> {
1563 info!(?room_id, ?pre_join_room_info, "Completing room join");
1564 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1565 room.state() == RoomState::Invited
1566 && room.is_direct().await.unwrap_or_else(|e| {
1567 warn!(%room_id, "is_direct() failed: {e}");
1568 false
1569 })
1570 } else {
1571 false
1572 };
1573
1574 let base_room = self
1575 .base_client()
1576 .room_joined(
1577 room_id,
1578 pre_join_room_info
1579 .as_ref()
1580 .and_then(|info| info.inviter.as_ref())
1581 .map(|i| i.user_id().to_owned()),
1582 )
1583 .await?;
1584 let room = Room::new(self.clone(), base_room);
1585
1586 if mark_as_dm {
1587 room.set_is_direct(true).await?;
1588 }
1589
1590 // If we joined following an invite, check if we had previously received a key
1591 // bundle from the inviter, and import it if so.
1592 //
1593 // It's important that we only do this once `BaseClient::room_joined` has
1594 // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1595 // race.
1596 #[cfg(feature = "e2e-encryption")]
1597 if self.inner.enable_share_history_on_invite
1598 && let Some(inviter) =
1599 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1600 {
1601 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1602 .await?;
1603 }
1604
1605 // Suppress "unused variable" and "unused field" lints
1606 #[cfg(not(feature = "e2e-encryption"))]
1607 let _ = pre_join_room_info.map(|i| i.inviter);
1608
1609 Ok(room)
1610 }
1611
1612 /// Join a room by `RoomId`.
1613 ///
1614 /// Returns the `Room` in the joined state.
1615 ///
1616 /// # Arguments
1617 ///
1618 /// * `room_id` - The `RoomId` of the room to be joined.
1619 #[instrument(skip(self))]
1620 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1621 // See who invited us to this room, if anyone. Note we have to do this before
1622 // making the `/join` request, otherwise we could race against the sync.
1623 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1624
1625 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1626 let response = self.send(request).await?;
1627 self.finish_join_room(&response.room_id, pre_join_info).await
1628 }
1629
1630 /// Join a room by `RoomOrAliasId`.
1631 ///
1632 /// Returns the `Room` in the joined state.
1633 ///
1634 /// # Arguments
1635 ///
1636 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1637 /// alias looks like `#name:example.com`.
1638 /// * `server_names` - The server names to be used for resolving the alias,
1639 /// if needs be.
1640 #[instrument(skip(self))]
1641 pub async fn join_room_by_id_or_alias(
1642 &self,
1643 alias: &RoomOrAliasId,
1644 server_names: &[OwnedServerName],
1645 ) -> Result<Room> {
1646 let pre_join_info = {
1647 match alias.try_into() {
1648 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1649 Err(_) => {
1650 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1651 // responding to an invitation to the room, and therefore don't need to handle
1652 // things that happen as a result of invites.
1653 None
1654 }
1655 }
1656 };
1657 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1658 via: server_names.to_owned(),
1659 });
1660 let response = self.send(request).await?;
1661 self.finish_join_room(&response.room_id, pre_join_info).await
1662 }
1663
1664 /// Search the homeserver's directory of public rooms.
1665 ///
1666 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1667 /// a `get_public_rooms::Response`.
1668 ///
1669 /// # Arguments
1670 ///
1671 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1672 ///
1673 /// * `since` - Pagination token from a previous request.
1674 ///
1675 /// * `server` - The name of the server, if `None` the requested server is
1676 /// used.
1677 ///
1678 /// # Examples
1679 /// ```no_run
1680 /// use matrix_sdk::Client;
1681 /// # use url::Url;
1682 /// # let homeserver = Url::parse("http://example.com").unwrap();
1683 /// # let limit = Some(10);
1684 /// # let since = Some("since token");
1685 /// # let server = Some("servername.com".try_into().unwrap());
1686 /// # async {
1687 /// let mut client = Client::new(homeserver).await.unwrap();
1688 ///
1689 /// client.public_rooms(limit, since, server).await;
1690 /// # };
1691 /// ```
1692 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1693 pub async fn public_rooms(
1694 &self,
1695 limit: Option<u32>,
1696 since: Option<&str>,
1697 server: Option<&ServerName>,
1698 ) -> HttpResult<get_public_rooms::v3::Response> {
1699 let limit = limit.map(UInt::from);
1700
1701 let request = assign!(get_public_rooms::v3::Request::new(), {
1702 limit,
1703 since: since.map(ToOwned::to_owned),
1704 server: server.map(ToOwned::to_owned),
1705 });
1706 self.send(request).await
1707 }
1708
1709 /// Create a room with the given parameters.
1710 ///
1711 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1712 /// created room.
1713 ///
1714 /// If you want to create a direct message with one specific user, you can
1715 /// use [`create_dm`][Self::create_dm], which is more convenient than
1716 /// assembling the [`create_room::v3::Request`] yourself.
1717 ///
1718 /// If the `is_direct` field of the request is set to `true` and at least
1719 /// one user is invited, the room will be automatically added to the direct
1720 /// rooms in the account data.
1721 ///
1722 /// # Examples
1723 ///
1724 /// ```no_run
1725 /// use matrix_sdk::{
1726 /// Client,
1727 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1728 /// };
1729 /// # use url::Url;
1730 /// #
1731 /// # async {
1732 /// # let homeserver = Url::parse("http://example.com").unwrap();
1733 /// let request = CreateRoomRequest::new();
1734 /// let client = Client::new(homeserver).await.unwrap();
1735 /// assert!(client.create_room(request).await.is_ok());
1736 /// # };
1737 /// ```
1738 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1739 let invite = request.invite.clone();
1740 let is_direct_room = request.is_direct;
1741 let response = self.send(request).await?;
1742
1743 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1744
1745 let joined_room = Room::new(self.clone(), base_room);
1746
1747 if is_direct_room
1748 && !invite.is_empty()
1749 && let Err(error) =
1750 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1751 {
1752 // FIXME: Retry in the background
1753 error!("Failed to mark room as DM: {error}");
1754 }
1755
1756 Ok(joined_room)
1757 }
1758
1759 /// Create a DM room.
1760 ///
1761 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1762 /// given user being invited, the room marked `is_direct` and both the
1763 /// creator and invitee getting the default maximum power level.
1764 ///
1765 /// If the `e2e-encryption` feature is enabled, the room will also be
1766 /// encrypted.
1767 ///
1768 /// # Arguments
1769 ///
1770 /// * `user_id` - The ID of the user to create a DM for.
1771 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1772 #[cfg(feature = "e2e-encryption")]
1773 let initial_state = vec![
1774 InitialStateEvent::with_empty_state_key(
1775 RoomEncryptionEventContent::with_recommended_defaults(),
1776 )
1777 .to_raw_any(),
1778 ];
1779
1780 #[cfg(not(feature = "e2e-encryption"))]
1781 let initial_state = vec![];
1782
1783 let request = assign!(create_room::v3::Request::new(), {
1784 invite: vec![user_id.to_owned()],
1785 is_direct: true,
1786 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1787 initial_state,
1788 });
1789
1790 self.create_room(request).await
1791 }
1792
1793 /// Get the existing DM room with the given user, if any.
1794 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1795 let rooms = self.joined_rooms();
1796
1797 // Find the room we share with the `user_id` and only with `user_id`
1798 let room = rooms.into_iter().find(|r| {
1799 let targets = r.direct_targets();
1800 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1801 });
1802
1803 trace!(?user_id, ?room, "Found DM room with user");
1804 room
1805 }
1806
1807 /// Search the homeserver's directory for public rooms with a filter.
1808 ///
1809 /// # Arguments
1810 ///
1811 /// * `room_search` - The easiest way to create this request is using the
1812 /// `get_public_rooms_filtered::Request` itself.
1813 ///
1814 /// # Examples
1815 ///
1816 /// ```no_run
1817 /// # use url::Url;
1818 /// # use matrix_sdk::Client;
1819 /// # async {
1820 /// # let homeserver = Url::parse("http://example.com")?;
1821 /// use matrix_sdk::ruma::{
1822 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1823 /// };
1824 /// # let mut client = Client::new(homeserver).await?;
1825 ///
1826 /// let mut filter = Filter::new();
1827 /// filter.generic_search_term = Some("rust".to_owned());
1828 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1829 /// request.filter = filter;
1830 ///
1831 /// let response = client.public_rooms_filtered(request).await?;
1832 ///
1833 /// for room in response.chunk {
1834 /// println!("Found room {room:?}");
1835 /// }
1836 /// # anyhow::Ok(()) };
1837 /// ```
1838 pub async fn public_rooms_filtered(
1839 &self,
1840 request: get_public_rooms_filtered::v3::Request,
1841 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1842 self.send(request).await
1843 }
1844
1845 /// Send an arbitrary request to the server, without updating client state.
1846 ///
1847 /// **Warning:** Because this method *does not* update the client state, it
1848 /// is important to make sure that you account for this yourself, and
1849 /// use wrapper methods where available. This method should *only* be
1850 /// used if a wrapper method for the endpoint you'd like to use is not
1851 /// available.
1852 ///
1853 /// # Arguments
1854 ///
1855 /// * `request` - A filled out and valid request for the endpoint to be hit
1856 ///
1857 /// * `timeout` - An optional request timeout setting, this overrides the
1858 /// default request setting if one was set.
1859 ///
1860 /// # Examples
1861 ///
1862 /// ```no_run
1863 /// # use matrix_sdk::{Client, config::SyncSettings};
1864 /// # use url::Url;
1865 /// # async {
1866 /// # let homeserver = Url::parse("http://localhost:8080")?;
1867 /// # let mut client = Client::new(homeserver).await?;
1868 /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1869 ///
1870 /// // First construct the request you want to make
1871 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1872 /// // for all available Endpoints
1873 /// let user_id = owned_user_id!("@example:localhost");
1874 /// let request = profile::get_profile::v3::Request::new(user_id);
1875 ///
1876 /// // Start the request using Client::send()
1877 /// let response = client.send(request).await?;
1878 ///
1879 /// // Check the corresponding Response struct to find out what types are
1880 /// // returned
1881 /// # anyhow::Ok(()) };
1882 /// ```
1883 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1884 where
1885 Request: OutgoingRequest + Clone + Debug,
1886 Request::Authentication: SupportedAuthScheme,
1887 Request::PathBuilder: SupportedPathBuilder,
1888 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1889 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1890 {
1891 SendRequest {
1892 client: self.clone(),
1893 request,
1894 config: None,
1895 send_progress: Default::default(),
1896 }
1897 }
1898
1899 pub(crate) async fn send_inner<Request>(
1900 &self,
1901 request: Request,
1902 config: Option<RequestConfig>,
1903 send_progress: SharedObservable<TransmissionProgress>,
1904 ) -> HttpResult<Request::IncomingResponse>
1905 where
1906 Request: OutgoingRequest + Debug,
1907 Request::Authentication: SupportedAuthScheme,
1908 Request::PathBuilder: SupportedPathBuilder,
1909 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1910 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1911 {
1912 let homeserver = self.homeserver().to_string();
1913 let access_token = self.access_token();
1914 let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1915
1916 let path_builder_input =
1917 Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1918
1919 let result = self
1920 .inner
1921 .http_client
1922 .send(
1923 request,
1924 config,
1925 homeserver,
1926 access_token.as_deref(),
1927 path_builder_input,
1928 send_progress,
1929 )
1930 .await;
1931
1932 if let Err(Some(ErrorKind::UnknownToken { .. })) =
1933 result.as_ref().map_err(HttpError::client_api_error_kind)
1934 && let Some(access_token) = &access_token
1935 {
1936 // Mark the access token as expired.
1937 self.auth_ctx().set_access_token_expired(access_token);
1938 }
1939
1940 result
1941 }
1942
1943 fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
1944 _ = self
1945 .inner
1946 .auth_ctx
1947 .session_change_sender
1948 .send(SessionChange::UnknownToken(unknown_token_data.clone()));
1949 }
1950
1951 /// Fetches server versions from network; no caching.
1952 pub async fn fetch_server_versions(
1953 &self,
1954 request_config: Option<RequestConfig>,
1955 ) -> HttpResult<get_supported_versions::Response> {
1956 // Since this was called by the user, try to refresh the access token if
1957 // necessary.
1958 self.fetch_server_versions_inner(false, request_config).await
1959 }
1960
1961 /// Fetches server versions from network; no caching.
1962 ///
1963 /// If the access token is expired and `failsafe` is `false`, this will
1964 /// attempt to refresh the access token, otherwise this will try to make an
1965 /// unauthenticated request instead.
1966 pub(crate) async fn fetch_server_versions_inner(
1967 &self,
1968 failsafe: bool,
1969 request_config: Option<RequestConfig>,
1970 ) -> HttpResult<get_supported_versions::Response> {
1971 if !failsafe {
1972 // `Client::send()` handles refreshing access tokens.
1973 return self
1974 .send(get_supported_versions::Request::new())
1975 .with_request_config(request_config)
1976 .await;
1977 }
1978
1979 let homeserver = self.homeserver().to_string();
1980
1981 // If we have a fresh access token, try with it first.
1982 if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
1983 && self.auth_ctx().has_valid_access_token()
1984 && let Some(access_token) = self.access_token()
1985 {
1986 let result = self
1987 .inner
1988 .http_client
1989 .send(
1990 get_supported_versions::Request::new(),
1991 request_config,
1992 homeserver.clone(),
1993 Some(&access_token),
1994 (),
1995 Default::default(),
1996 )
1997 .await;
1998
1999 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2000 result.as_ref().map_err(HttpError::client_api_error_kind)
2001 {
2002 // If the access token is actually expired, mark it as expired and fallback to
2003 // the unauthenticated request below.
2004 self.auth_ctx().set_access_token_expired(&access_token);
2005 } else {
2006 // If the request succeeded or it's an other error, just stop now.
2007 return result;
2008 }
2009 }
2010
2011 // Try without authentication.
2012 self.inner
2013 .http_client
2014 .send(
2015 get_supported_versions::Request::new(),
2016 request_config,
2017 homeserver.clone(),
2018 None,
2019 (),
2020 Default::default(),
2021 )
2022 .await
2023 }
2024
2025 /// Fetches client well_known from network; no caching.
2026 ///
2027 /// 1. If the [`Client::server`] value is available, we use it to fetch the
2028 /// well-known contents.
2029 /// 2. If it's not, we try extracting the server name from the
2030 /// [`Client::user_id`] and building the server URL from it.
2031 /// 3. If we couldn't get the well-known contents with either the explicit
2032 /// server name or the implicit extracted one, we try the homeserver URL
2033 /// as a last resort.
2034 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2035 let homeserver = self.homeserver();
2036 let scheme = homeserver.scheme();
2037
2038 // Use the server name, either an explicit one or an implicit one taken from
2039 // the user id: sometimes we'll have only the homeserver url available and no
2040 // server name, but the server name can be extracted from the current user id.
2041 let server_url = self
2042 .server()
2043 .map(|server| server.to_string())
2044 // If the server name wasn't available, extract it from the user id and build a URL:
2045 // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2046 // will be the same for the public server url, lacking a better candidate.
2047 .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2048
2049 // If the server name is available, first try using it
2050 let response = if let Some(server_url) = server_url {
2051 // First try using the server name
2052 self.fetch_client_well_known_with_url(server_url).await
2053 } else {
2054 None
2055 };
2056
2057 // If we didn't get a well-known value yet, try with the homeserver url instead:
2058 if response.is_none() {
2059 // Sometimes people configure their well-known directly on the homeserver so use
2060 // this as a fallback when the server name is unknown.
2061 warn!(
2062 "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2063 );
2064 self.fetch_client_well_known_with_url(homeserver.to_string()).await
2065 } else {
2066 response
2067 }
2068 }
2069
2070 async fn fetch_client_well_known_with_url(
2071 &self,
2072 url: String,
2073 ) -> Option<discover_homeserver::Response> {
2074 let well_known = self
2075 .inner
2076 .http_client
2077 .send(
2078 discover_homeserver::Request::new(),
2079 Some(RequestConfig::short_retry()),
2080 url,
2081 None,
2082 (),
2083 Default::default(),
2084 )
2085 .await;
2086
2087 match well_known {
2088 Ok(well_known) => Some(well_known),
2089 Err(http_error) => {
2090 // It is perfectly valid to not have a well-known file.
2091 // Maybe we should check for a specific error code to be sure?
2092 warn!("Failed to fetch client well-known: {http_error}");
2093 None
2094 }
2095 }
2096 }
2097
2098 /// Load supported versions from storage, or fetch them from network and
2099 /// cache them.
2100 ///
2101 /// If `failsafe` is true, this will try to minimize side effects to avoid
2102 /// possible deadlocks.
2103 async fn load_or_fetch_supported_versions(
2104 &self,
2105 failsafe: bool,
2106 ) -> HttpResult<SupportedVersionsResponse> {
2107 match self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await {
2108 Ok(Some(stored)) => {
2109 if let Some(supported_versions) =
2110 stored.into_supported_versions().and_then(|value| value.into_data())
2111 {
2112 return Ok(supported_versions);
2113 }
2114 }
2115 Ok(None) => {
2116 // fallthrough: cache is empty
2117 }
2118 Err(err) => {
2119 warn!("error when loading cached supported versions: {err}");
2120 // fallthrough to network.
2121 }
2122 }
2123
2124 let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2125 let supported_versions = SupportedVersionsResponse {
2126 versions: server_versions.versions,
2127 unstable_features: server_versions.unstable_features,
2128 };
2129
2130 // Only attempt to cache the result in storage if the request was authenticated.
2131 if self.auth_ctx().has_valid_access_token()
2132 && let Err(err) = self
2133 .state_store()
2134 .set_kv_data(
2135 StateStoreDataKey::SupportedVersions,
2136 StateStoreDataValue::SupportedVersions(TtlStoreValue::new(
2137 supported_versions.clone(),
2138 )),
2139 )
2140 .await
2141 {
2142 warn!("error when caching supported versions: {err}");
2143 }
2144
2145 Ok(supported_versions)
2146 }
2147
2148 pub(crate) async fn get_cached_supported_versions(&self) -> Option<SupportedVersions> {
2149 let supported_versions = &self.inner.caches.supported_versions;
2150
2151 if let CachedValue::Cached(val) = &*supported_versions.read().await {
2152 Some(val.clone())
2153 } else {
2154 None
2155 }
2156 }
2157
2158 /// Get the Matrix versions and features supported by the homeserver by
2159 /// fetching them from the server or the cache.
2160 ///
2161 /// This is equivalent to calling both [`Client::server_versions()`] and
2162 /// [`Client::unstable_features()`]. To always fetch the result from the
2163 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2164 /// and then `.as_supported_versions()` on the response.
2165 ///
2166 /// # Examples
2167 ///
2168 /// ```no_run
2169 /// use ruma::api::{FeatureFlag, MatrixVersion};
2170 /// # use matrix_sdk::{Client, config::SyncSettings};
2171 /// # use url::Url;
2172 /// # async {
2173 /// # let homeserver = Url::parse("http://localhost:8080")?;
2174 /// # let mut client = Client::new(homeserver).await?;
2175 ///
2176 /// let supported = client.supported_versions().await?;
2177 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2178 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2179 ///
2180 /// let msc_x_feature = FeatureFlag::from("msc_x");
2181 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2182 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2183 /// # anyhow::Ok(()) };
2184 /// ```
2185 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2186 self.supported_versions_inner(false).await
2187 }
2188
2189 /// Get the Matrix versions and features supported by the homeserver by
2190 /// fetching them from the server or the cache.
2191 ///
2192 /// If `failsafe` is true, this will try to minimize side effects to avoid
2193 /// possible deadlocks.
2194 pub(crate) async fn supported_versions_inner(
2195 &self,
2196 failsafe: bool,
2197 ) -> HttpResult<SupportedVersions> {
2198 let cached_supported_versions = &self.inner.caches.supported_versions;
2199 if let CachedValue::Cached(val) = &*cached_supported_versions.read().await {
2200 return Ok(val.clone());
2201 }
2202
2203 let mut guarded_supported_versions = cached_supported_versions.write().await;
2204 if let CachedValue::Cached(val) = &*guarded_supported_versions {
2205 return Ok(val.clone());
2206 }
2207
2208 let supported = self.load_or_fetch_supported_versions(failsafe).await?;
2209
2210 // Fill both unstable features and server versions at once.
2211 let mut supported_versions = supported.supported_versions();
2212 if supported_versions.versions.is_empty() {
2213 supported_versions.versions = [MatrixVersion::V1_0].into();
2214 }
2215
2216 // Only cache the result if the request was authenticated.
2217 if self.auth_ctx().has_valid_access_token() {
2218 *guarded_supported_versions = CachedValue::Cached(supported_versions.clone());
2219 }
2220
2221 Ok(supported_versions)
2222 }
2223
2224 /// Get the Matrix versions and features supported by the homeserver by
2225 /// fetching them from the cache.
2226 ///
2227 /// For a version of this function that fetches the supported versions and
2228 /// features from the homeserver if the [`SupportedVersions`] aren't
2229 /// found in the cache, take a look at the [`Client::server_versions()`]
2230 /// method.
2231 ///
2232 /// # Examples
2233 ///
2234 /// ```no_run
2235 /// use ruma::api::{FeatureFlag, MatrixVersion};
2236 /// # use matrix_sdk::{Client, config::SyncSettings};
2237 /// # use url::Url;
2238 /// # async {
2239 /// # let homeserver = Url::parse("http://localhost:8080")?;
2240 /// # let mut client = Client::new(homeserver).await?;
2241 ///
2242 /// let supported =
2243 /// if let Some(supported) = client.supported_versions_cached().await? {
2244 /// supported
2245 /// } else {
2246 /// client.fetch_server_versions(None).await?.as_supported_versions()
2247 /// };
2248 ///
2249 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2250 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2251 ///
2252 /// let msc_x_feature = FeatureFlag::from("msc_x");
2253 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2254 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2255 /// # anyhow::Ok(()) };
2256 /// ```
2257 pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2258 if let Some(cached) = self.get_cached_supported_versions().await {
2259 Ok(Some(cached))
2260 } else if let Some(stored) =
2261 self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await?
2262 {
2263 if let Some(supported) =
2264 stored.into_supported_versions().and_then(|value| value.into_data())
2265 {
2266 Ok(Some(supported.supported_versions()))
2267 } else {
2268 Ok(None)
2269 }
2270 } else {
2271 Ok(None)
2272 }
2273 }
2274
2275 /// Get the Matrix versions supported by the homeserver by fetching them
2276 /// from the server or the cache.
2277 ///
2278 /// # Examples
2279 ///
2280 /// ```no_run
2281 /// use ruma::api::MatrixVersion;
2282 /// # use matrix_sdk::{Client, config::SyncSettings};
2283 /// # use url::Url;
2284 /// # async {
2285 /// # let homeserver = Url::parse("http://localhost:8080")?;
2286 /// # let mut client = Client::new(homeserver).await?;
2287 ///
2288 /// let server_versions = client.server_versions().await?;
2289 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2290 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2291 /// # anyhow::Ok(()) };
2292 /// ```
2293 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2294 Ok(self.supported_versions().await?.versions)
2295 }
2296
2297 /// Get the unstable features supported by the homeserver by fetching them
2298 /// from the server or the cache.
2299 ///
2300 /// # Examples
2301 ///
2302 /// ```no_run
2303 /// use matrix_sdk::ruma::api::FeatureFlag;
2304 /// # use matrix_sdk::{Client, config::SyncSettings};
2305 /// # use url::Url;
2306 /// # async {
2307 /// # let homeserver = Url::parse("http://localhost:8080")?;
2308 /// # let mut client = Client::new(homeserver).await?;
2309 ///
2310 /// let msc_x_feature = FeatureFlag::from("msc_x");
2311 /// let unstable_features = client.unstable_features().await?;
2312 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2313 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2314 /// # anyhow::Ok(()) };
2315 /// ```
2316 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2317 Ok(self.supported_versions().await?.features)
2318 }
2319
2320 /// Empty the supported versions and unstable features cache.
2321 ///
2322 /// Since the SDK caches the supported versions, it's possible to have a
2323 /// stale entry in the cache. This functions makes it possible to force
2324 /// reset it.
2325 pub async fn reset_supported_versions(&self) -> Result<()> {
2326 // Empty the in-memory caches.
2327 self.inner.caches.supported_versions.write().await.take();
2328
2329 // Empty the store cache.
2330 Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2331 }
2332
2333 /// Load well-known from storage, or fetch it from network and cache it.
2334 async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2335 match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
2336 Ok(Some(stored)) => {
2337 if let Some(well_known) =
2338 stored.into_well_known().and_then(|value| value.into_data())
2339 {
2340 return Ok(well_known);
2341 }
2342 }
2343 Ok(None) => {
2344 // fallthrough: cache is empty
2345 }
2346 Err(err) => {
2347 warn!("error when loading cached well-known: {err}");
2348 // fallthrough to network.
2349 }
2350 }
2351
2352 let well_known = self.fetch_client_well_known().await.map(Into::into);
2353
2354 // Attempt to cache the result in storage.
2355 if let Err(err) = self
2356 .state_store()
2357 .set_kv_data(
2358 StateStoreDataKey::WellKnown,
2359 StateStoreDataValue::WellKnown(TtlStoreValue::new(well_known.clone())),
2360 )
2361 .await
2362 {
2363 warn!("error when caching well-known: {err}");
2364 }
2365
2366 Ok(well_known)
2367 }
2368
2369 async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2370 let well_known = &self.inner.caches.well_known;
2371 if let CachedValue::Cached(val) = &*well_known.read().await {
2372 return Ok(val.clone());
2373 }
2374
2375 let mut guarded_well_known = well_known.write().await;
2376 if let CachedValue::Cached(val) = &*guarded_well_known {
2377 return Ok(val.clone());
2378 }
2379
2380 let well_known = self.load_or_fetch_well_known().await?;
2381
2382 *guarded_well_known = CachedValue::Cached(well_known.clone());
2383
2384 Ok(well_known)
2385 }
2386
2387 /// Get information about the homeserver's advertised RTC foci by fetching
2388 /// the well-known file from the server or the cache.
2389 ///
2390 /// # Examples
2391 /// ```no_run
2392 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2393 /// # use url::Url;
2394 /// # async {
2395 /// # let homeserver = Url::parse("http://localhost:8080")?;
2396 /// # let mut client = Client::new(homeserver).await?;
2397 /// let rtc_foci = client.rtc_foci().await?;
2398 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2399 /// RtcFocusInfo::LiveKit(info) => Some(info),
2400 /// _ => None,
2401 /// });
2402 /// if let Some(info) = default_livekit_focus_info {
2403 /// println!("Default LiveKit service URL: {}", info.service_url);
2404 /// }
2405 /// # anyhow::Ok(()) };
2406 /// ```
2407 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2408 let well_known = self.get_or_load_and_cache_well_known().await?;
2409
2410 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2411 }
2412
2413 /// Empty the well-known cache.
2414 ///
2415 /// Since the SDK caches the well-known, it's possible to have a stale entry
2416 /// in the cache. This functions makes it possible to force reset it.
2417 pub async fn reset_well_known(&self) -> Result<()> {
2418 // Empty the in-memory caches.
2419 self.inner.caches.well_known.write().await.take();
2420
2421 // Empty the store cache.
2422 Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2423 }
2424
2425 /// Check whether MSC 4028 is enabled on the homeserver.
2426 ///
2427 /// # Examples
2428 ///
2429 /// ```no_run
2430 /// # use matrix_sdk::{Client, config::SyncSettings};
2431 /// # use url::Url;
2432 /// # async {
2433 /// # let homeserver = Url::parse("http://localhost:8080")?;
2434 /// # let mut client = Client::new(homeserver).await?;
2435 /// let msc4028_enabled =
2436 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2437 /// # anyhow::Ok(()) };
2438 /// ```
2439 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2440 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2441 }
2442
2443 /// Get information of all our own devices.
2444 ///
2445 /// # Examples
2446 ///
2447 /// ```no_run
2448 /// # use matrix_sdk::{Client, config::SyncSettings};
2449 /// # use url::Url;
2450 /// # async {
2451 /// # let homeserver = Url::parse("http://localhost:8080")?;
2452 /// # let mut client = Client::new(homeserver).await?;
2453 /// let response = client.devices().await?;
2454 ///
2455 /// for device in response.devices {
2456 /// println!(
2457 /// "Device: {} {}",
2458 /// device.device_id,
2459 /// device.display_name.as_deref().unwrap_or("")
2460 /// );
2461 /// }
2462 /// # anyhow::Ok(()) };
2463 /// ```
2464 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2465 let request = get_devices::v3::Request::new();
2466
2467 self.send(request).await
2468 }
2469
2470 /// Delete the given devices from the server.
2471 ///
2472 /// # Arguments
2473 ///
2474 /// * `devices` - The list of devices that should be deleted from the
2475 /// server.
2476 ///
2477 /// * `auth_data` - This request requires user interactive auth, the first
2478 /// request needs to set this to `None` and will always fail with an
2479 /// `UiaaResponse`. The response will contain information for the
2480 /// interactive auth and the same request needs to be made but this time
2481 /// with some `auth_data` provided.
2482 ///
2483 /// ```no_run
2484 /// # use matrix_sdk::{
2485 /// # ruma::{api::client::uiaa, owned_device_id},
2486 /// # Client, Error, config::SyncSettings,
2487 /// # };
2488 /// # use serde_json::json;
2489 /// # use url::Url;
2490 /// # use std::collections::BTreeMap;
2491 /// # async {
2492 /// # let homeserver = Url::parse("http://localhost:8080")?;
2493 /// # let mut client = Client::new(homeserver).await?;
2494 /// let devices = &[owned_device_id!("DEVICEID")];
2495 ///
2496 /// if let Err(e) = client.delete_devices(devices, None).await {
2497 /// if let Some(info) = e.as_uiaa_response() {
2498 /// let mut password = uiaa::Password::new(
2499 /// uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2500 /// "wordpass".to_owned(),
2501 /// );
2502 /// password.session = info.session.clone();
2503 ///
2504 /// client
2505 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2506 /// .await?;
2507 /// }
2508 /// }
2509 /// # anyhow::Ok(()) };
2510 pub async fn delete_devices(
2511 &self,
2512 devices: &[OwnedDeviceId],
2513 auth_data: Option<uiaa::AuthData>,
2514 ) -> HttpResult<delete_devices::v3::Response> {
2515 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2516 request.auth = auth_data;
2517
2518 self.send(request).await
2519 }
2520
2521 /// Change the display name of a device owned by the current user.
2522 ///
2523 /// Returns a `update_device::Response` which specifies the result
2524 /// of the operation.
2525 ///
2526 /// # Arguments
2527 ///
2528 /// * `device_id` - The ID of the device to change the display name of.
2529 /// * `display_name` - The new display name to set.
2530 pub async fn rename_device(
2531 &self,
2532 device_id: &DeviceId,
2533 display_name: &str,
2534 ) -> HttpResult<update_device::v3::Response> {
2535 let mut request = update_device::v3::Request::new(device_id.to_owned());
2536 request.display_name = Some(display_name.to_owned());
2537
2538 self.send(request).await
2539 }
2540
2541 /// Check whether a device with a specific ID exists on the server.
2542 ///
2543 /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2544 /// with 404 and the underlying error otherwise.
2545 ///
2546 /// # Arguments
2547 ///
2548 /// * `device_id` - The ID of the device to query.
2549 pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2550 let request = device::get_device::v3::Request::new(device_id);
2551 match self.send(request).await {
2552 Ok(_) => Ok(true),
2553 Err(err) => {
2554 if let Some(error) = err.as_client_api_error()
2555 && error.status_code == 404
2556 {
2557 Ok(false)
2558 } else {
2559 Err(err.into())
2560 }
2561 }
2562 }
2563 }
2564
2565 /// Synchronize the client's state with the latest state on the server.
2566 ///
2567 /// ## Syncing Events
2568 ///
2569 /// Messages or any other type of event need to be periodically fetched from
2570 /// the server, this is achieved by sending a `/sync` request to the server.
2571 ///
2572 /// The first sync is sent out without a [`token`]. The response of the
2573 /// first sync will contain a [`next_batch`] field which should then be
2574 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2575 /// don't receive the same events multiple times.
2576 ///
2577 /// ## Long Polling
2578 ///
2579 /// A sync should in the usual case always be in flight. The
2580 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2581 /// long the server will wait for new events before it will respond.
2582 /// The server will respond immediately if some new events arrive before the
2583 /// timeout has expired. If no changes arrive and the timeout expires an
2584 /// empty sync response will be sent to the client.
2585 ///
2586 /// This method of sending a request that may not receive a response
2587 /// immediately is called long polling.
2588 ///
2589 /// ## Filtering Events
2590 ///
2591 /// The number or type of messages and events that the client should receive
2592 /// from the server can be altered using a [`Filter`].
2593 ///
2594 /// Filters can be non-trivial and, since they will be sent with every sync
2595 /// request, they may take up a bunch of unnecessary bandwidth.
2596 ///
2597 /// Luckily filters can be uploaded to the server and reused using an unique
2598 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2599 /// method.
2600 ///
2601 /// # Arguments
2602 ///
2603 /// * `sync_settings` - Settings for the sync call, this allows us to set
2604 /// various options to configure the sync:
2605 /// * [`filter`] - To configure which events we receive and which get
2606 /// [filtered] by the server
2607 /// * [`timeout`] - To configure our [long polling] setup.
2608 /// * [`token`] - To tell the server which events we already received
2609 /// and where we wish to continue syncing.
2610 /// * [`full_state`] - To tell the server that we wish to receive all
2611 /// state events, regardless of our configured [`token`].
2612 /// * [`set_presence`] - To tell the server to set the presence and to
2613 /// which state.
2614 ///
2615 /// # Examples
2616 ///
2617 /// ```no_run
2618 /// # use url::Url;
2619 /// # async {
2620 /// # let homeserver = Url::parse("http://localhost:8080")?;
2621 /// # let username = "";
2622 /// # let password = "";
2623 /// use matrix_sdk::{
2624 /// Client, config::SyncSettings,
2625 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2626 /// };
2627 ///
2628 /// let client = Client::new(homeserver).await?;
2629 /// client.matrix_auth().login_username(username, password).send().await?;
2630 ///
2631 /// // Sync once so we receive the client state and old messages.
2632 /// client.sync_once(SyncSettings::default()).await?;
2633 ///
2634 /// // Register our handler so we start responding once we receive a new
2635 /// // event.
2636 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2637 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2638 /// });
2639 ///
2640 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2641 /// // from our `sync_once()` call automatically.
2642 /// client.sync(SyncSettings::default()).await;
2643 /// # anyhow::Ok(()) };
2644 /// ```
2645 ///
2646 /// [`sync`]: #method.sync
2647 /// [`SyncSettings`]: crate::config::SyncSettings
2648 /// [`token`]: crate::config::SyncSettings#method.token
2649 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2650 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2651 /// [`set_presence`]: ruma::presence::PresenceState
2652 /// [`filter`]: crate::config::SyncSettings#method.filter
2653 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2654 /// [`next_batch`]: SyncResponse#structfield.next_batch
2655 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2656 /// [long polling]: #long-polling
2657 /// [filtered]: #filtering-events
2658 #[instrument(skip(self))]
2659 pub async fn sync_once(
2660 &self,
2661 sync_settings: crate::config::SyncSettings,
2662 ) -> Result<SyncResponse> {
2663 // The sync might not return for quite a while due to the timeout.
2664 // We'll see if there's anything crypto related to send out before we
2665 // sync, i.e. if we closed our client after a sync but before the
2666 // crypto requests were sent out.
2667 //
2668 // This will mostly be a no-op.
2669 #[cfg(feature = "e2e-encryption")]
2670 if let Err(e) = self.send_outgoing_requests().await {
2671 error!(error = ?e, "Error while sending outgoing E2EE requests");
2672 }
2673
2674 let token = match sync_settings.token {
2675 SyncToken::Specific(token) => Some(token),
2676 SyncToken::NoToken => None,
2677 SyncToken::ReusePrevious => self.sync_token().await,
2678 };
2679
2680 let request = assign!(sync_events::v3::Request::new(), {
2681 filter: sync_settings.filter.map(|f| *f),
2682 since: token,
2683 full_state: sync_settings.full_state,
2684 set_presence: sync_settings.set_presence,
2685 timeout: sync_settings.timeout,
2686 use_state_after: true,
2687 });
2688 let mut request_config = self.request_config();
2689 if let Some(timeout) = sync_settings.timeout {
2690 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2691 request_config.timeout = Some(base_timeout + timeout);
2692 }
2693
2694 let response = self.send(request).with_request_config(request_config).await?;
2695 let next_batch = response.next_batch.clone();
2696 let response = self.process_sync(response).await?;
2697
2698 #[cfg(feature = "e2e-encryption")]
2699 if let Err(e) = self.send_outgoing_requests().await {
2700 error!(error = ?e, "Error while sending outgoing E2EE requests");
2701 }
2702
2703 self.inner.sync_beat.notify(usize::MAX);
2704
2705 Ok(SyncResponse::new(next_batch, response))
2706 }
2707
2708 /// Repeatedly synchronize the client state with the server.
2709 ///
2710 /// This method will only return on error, if cancellation is needed
2711 /// the method should be wrapped in a cancelable task or the
2712 /// [`Client::sync_with_callback`] method can be used or
2713 /// [`Client::sync_with_result_callback`] if you want to handle error
2714 /// cases in the loop, too.
2715 ///
2716 /// This method will internally call [`Client::sync_once`] in a loop.
2717 ///
2718 /// This method can be used with the [`Client::add_event_handler`]
2719 /// method to react to individual events. If you instead wish to handle
2720 /// events in a bulk manner the [`Client::sync_with_callback`],
2721 /// [`Client::sync_with_result_callback`] and
2722 /// [`Client::sync_stream`] methods can be used instead. Those methods
2723 /// repeatedly return the whole sync response.
2724 ///
2725 /// # Arguments
2726 ///
2727 /// * `sync_settings` - Settings for the sync call. *Note* that those
2728 /// settings will be only used for the first sync call. See the argument
2729 /// docs for [`Client::sync_once`] for more info.
2730 ///
2731 /// # Return
2732 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2733 /// up to the user of the API to check the error and decide whether the sync
2734 /// should continue or not.
2735 ///
2736 /// # Examples
2737 ///
2738 /// ```no_run
2739 /// # use url::Url;
2740 /// # async {
2741 /// # let homeserver = Url::parse("http://localhost:8080")?;
2742 /// # let username = "";
2743 /// # let password = "";
2744 /// use matrix_sdk::{
2745 /// Client, config::SyncSettings,
2746 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2747 /// };
2748 ///
2749 /// let client = Client::new(homeserver).await?;
2750 /// client.matrix_auth().login_username(&username, &password).send().await?;
2751 ///
2752 /// // Register our handler so we start responding once we receive a new
2753 /// // event.
2754 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2755 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2756 /// });
2757 ///
2758 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2759 /// // automatically.
2760 /// client.sync(SyncSettings::default()).await?;
2761 /// # anyhow::Ok(()) };
2762 /// ```
2763 ///
2764 /// [argument docs]: #method.sync_once
2765 /// [`sync_with_callback`]: #method.sync_with_callback
2766 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2767 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2768 }
2769
2770 /// Repeatedly call sync to synchronize the client state with the server.
2771 ///
2772 /// # Arguments
2773 ///
2774 /// * `sync_settings` - Settings for the sync call. *Note* that those
2775 /// settings will be only used for the first sync call. See the argument
2776 /// docs for [`Client::sync_once`] for more info.
2777 ///
2778 /// * `callback` - A callback that will be called every time a successful
2779 /// response has been fetched from the server. The callback must return a
2780 /// boolean which signalizes if the method should stop syncing. If the
2781 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2782 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2783 ///
2784 /// # Return
2785 /// The sync runs until an error occurs or the
2786 /// callback indicates that the Loop should stop. If the callback asked for
2787 /// a regular stop, the result will be `Ok(())` otherwise the
2788 /// `Err(Error)` is returned.
2789 ///
2790 /// # Examples
2791 ///
2792 /// The following example demonstrates how to sync forever while sending all
2793 /// the interesting events through a mpsc channel to another thread e.g. a
2794 /// UI thread.
2795 ///
2796 /// ```no_run
2797 /// # use std::time::Duration;
2798 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2799 /// # use url::Url;
2800 /// # async {
2801 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2802 /// # let mut client = Client::new(homeserver).await.unwrap();
2803 ///
2804 /// use tokio::sync::mpsc::channel;
2805 ///
2806 /// let (tx, rx) = channel(100);
2807 ///
2808 /// let sync_channel = &tx;
2809 /// let sync_settings = SyncSettings::new()
2810 /// .timeout(Duration::from_secs(30));
2811 ///
2812 /// client
2813 /// .sync_with_callback(sync_settings, |response| async move {
2814 /// let channel = sync_channel;
2815 /// for (room_id, room) in response.rooms.joined {
2816 /// for event in room.timeline.events {
2817 /// channel.send(event).await.unwrap();
2818 /// }
2819 /// }
2820 ///
2821 /// LoopCtrl::Continue
2822 /// })
2823 /// .await;
2824 /// };
2825 /// ```
2826 #[instrument(skip_all)]
2827 pub async fn sync_with_callback<C>(
2828 &self,
2829 sync_settings: crate::config::SyncSettings,
2830 callback: impl Fn(SyncResponse) -> C,
2831 ) -> Result<(), Error>
2832 where
2833 C: Future<Output = LoopCtrl>,
2834 {
2835 self.sync_with_result_callback(sync_settings, |result| async {
2836 Ok(callback(result?).await)
2837 })
2838 .await
2839 }
2840
2841 /// Repeatedly call sync to synchronize the client state with the server.
2842 ///
2843 /// # Arguments
2844 ///
2845 /// * `sync_settings` - Settings for the sync call. *Note* that those
2846 /// settings will be only used for the first sync call. See the argument
2847 /// docs for [`Client::sync_once`] for more info.
2848 ///
2849 /// * `callback` - A callback that will be called every time after a
2850 /// response has been received, failure or not. The callback returns a
2851 /// `Result<LoopCtrl, Error>`, too. When returning
2852 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2853 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2854 /// function returns `Ok(())`. In case the callback can't handle the
2855 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2856 /// which results in the sync ending and the `Err(Error)` being returned.
2857 ///
2858 /// # Return
2859 /// The sync runs until an error occurs that the callback can't handle or
2860 /// the callback indicates that the Loop should stop. If the callback
2861 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2862 /// `Err(Error)` is returned.
2863 ///
2864 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2865 /// this, and are handled first without sending the result to the
2866 /// callback. Only after they have exceeded is the `Result` handed to
2867 /// the callback.
2868 ///
2869 /// # Examples
2870 ///
2871 /// The following example demonstrates how to sync forever while sending all
2872 /// the interesting events through a mpsc channel to another thread e.g. a
2873 /// UI thread.
2874 ///
2875 /// ```no_run
2876 /// # use std::time::Duration;
2877 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2878 /// # use url::Url;
2879 /// # async {
2880 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2881 /// # let mut client = Client::new(homeserver).await.unwrap();
2882 /// #
2883 /// use tokio::sync::mpsc::channel;
2884 ///
2885 /// let (tx, rx) = channel(100);
2886 ///
2887 /// let sync_channel = &tx;
2888 /// let sync_settings = SyncSettings::new()
2889 /// .timeout(Duration::from_secs(30));
2890 ///
2891 /// client
2892 /// .sync_with_result_callback(sync_settings, |response| async move {
2893 /// let channel = sync_channel;
2894 /// let sync_response = response?;
2895 /// for (room_id, room) in sync_response.rooms.joined {
2896 /// for event in room.timeline.events {
2897 /// channel.send(event).await.unwrap();
2898 /// }
2899 /// }
2900 ///
2901 /// Ok(LoopCtrl::Continue)
2902 /// })
2903 /// .await;
2904 /// };
2905 /// ```
2906 #[instrument(skip(self, callback))]
2907 pub async fn sync_with_result_callback<C>(
2908 &self,
2909 sync_settings: crate::config::SyncSettings,
2910 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2911 ) -> Result<(), Error>
2912 where
2913 C: Future<Output = Result<LoopCtrl, Error>>,
2914 {
2915 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2916
2917 while let Some(result) = sync_stream.next().await {
2918 trace!("Running callback");
2919 if callback(result).await? == LoopCtrl::Break {
2920 trace!("Callback told us to stop");
2921 break;
2922 }
2923 trace!("Done running callback");
2924 }
2925
2926 Ok(())
2927 }
2928
2929 //// Repeatedly synchronize the client state with the server.
2930 ///
2931 /// This method will internally call [`Client::sync_once`] in a loop and is
2932 /// equivalent to the [`Client::sync`] method but the responses are provided
2933 /// as an async stream.
2934 ///
2935 /// # Arguments
2936 ///
2937 /// * `sync_settings` - Settings for the sync call. *Note* that those
2938 /// settings will be only used for the first sync call. See the argument
2939 /// docs for [`Client::sync_once`] for more info.
2940 ///
2941 /// # Examples
2942 ///
2943 /// ```no_run
2944 /// # use url::Url;
2945 /// # async {
2946 /// # let homeserver = Url::parse("http://localhost:8080")?;
2947 /// # let username = "";
2948 /// # let password = "";
2949 /// use futures_util::StreamExt;
2950 /// use matrix_sdk::{Client, config::SyncSettings};
2951 ///
2952 /// let client = Client::new(homeserver).await?;
2953 /// client.matrix_auth().login_username(&username, &password).send().await?;
2954 ///
2955 /// let mut sync_stream =
2956 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2957 ///
2958 /// while let Some(Ok(response)) = sync_stream.next().await {
2959 /// for room in response.rooms.joined.values() {
2960 /// for e in &room.timeline.events {
2961 /// if let Ok(event) = e.raw().deserialize() {
2962 /// println!("Received event {:?}", event);
2963 /// }
2964 /// }
2965 /// }
2966 /// }
2967 ///
2968 /// # anyhow::Ok(()) };
2969 /// ```
2970 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2971 #[instrument(skip(self))]
2972 pub async fn sync_stream(
2973 &self,
2974 mut sync_settings: crate::config::SyncSettings,
2975 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2976 let mut is_first_sync = true;
2977 let mut timeout = None;
2978 let mut last_sync_time: Option<Instant> = None;
2979
2980 let parent_span = Span::current();
2981
2982 async_stream::stream!({
2983 loop {
2984 trace!("Syncing");
2985
2986 if sync_settings.ignore_timeout_on_first_sync {
2987 if is_first_sync {
2988 timeout = sync_settings.timeout.take();
2989 } else if sync_settings.timeout.is_none() && timeout.is_some() {
2990 sync_settings.timeout = timeout.take();
2991 }
2992
2993 is_first_sync = false;
2994 }
2995
2996 yield self
2997 .sync_loop_helper(&mut sync_settings)
2998 .instrument(parent_span.clone())
2999 .await;
3000
3001 Client::delay_sync(&mut last_sync_time).await
3002 }
3003 })
3004 }
3005
3006 /// Get the current, if any, sync token of the client.
3007 /// This will be None if the client didn't sync at least once.
3008 pub(crate) async fn sync_token(&self) -> Option<String> {
3009 self.inner.base_client.sync_token().await
3010 }
3011
3012 /// Gets information about the owner of a given access token.
3013 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3014 let request = whoami::v3::Request::new();
3015 self.send(request).await
3016 }
3017
3018 /// Subscribes a new receiver to client SessionChange broadcasts.
3019 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3020 let broadcast = &self.auth_ctx().session_change_sender;
3021 broadcast.subscribe()
3022 }
3023
3024 /// Sets the save/restore session callbacks.
3025 ///
3026 /// This is another mechanism to get synchronous updates to session tokens,
3027 /// while [`Self::subscribe_to_session_changes`] provides an async update.
3028 pub fn set_session_callbacks(
3029 &self,
3030 reload_session_callback: Box<ReloadSessionCallback>,
3031 save_session_callback: Box<SaveSessionCallback>,
3032 ) -> Result<()> {
3033 self.inner
3034 .auth_ctx
3035 .reload_session_callback
3036 .set(reload_session_callback)
3037 .map_err(|_| Error::MultipleSessionCallbacks)?;
3038
3039 self.inner
3040 .auth_ctx
3041 .save_session_callback
3042 .set(save_session_callback)
3043 .map_err(|_| Error::MultipleSessionCallbacks)?;
3044
3045 Ok(())
3046 }
3047
3048 /// Get the notification settings of the current owner of the client.
3049 pub async fn notification_settings(&self) -> NotificationSettings {
3050 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3051 NotificationSettings::new(self.clone(), ruleset)
3052 }
3053
3054 /// Create a new specialized `Client` that can process notifications.
3055 ///
3056 /// See [`CrossProcessLock::new`] to learn more about
3057 /// `cross_process_lock_config`.
3058 ///
3059 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3060 pub async fn notification_client(
3061 &self,
3062 cross_process_lock_config: CrossProcessLockConfig,
3063 ) -> Result<Client> {
3064 let client = Client {
3065 inner: ClientInner::new(
3066 self.inner.auth_ctx.clone(),
3067 self.server().cloned(),
3068 self.homeserver(),
3069 self.sliding_sync_version(),
3070 self.inner.http_client.clone(),
3071 self.inner
3072 .base_client
3073 .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3074 .await?,
3075 self.inner.caches.supported_versions.read().await.clone(),
3076 self.inner.caches.well_known.read().await.clone(),
3077 self.inner.respect_login_well_known,
3078 self.inner.event_cache.clone(),
3079 self.inner.send_queue_data.clone(),
3080 self.inner.latest_events.clone(),
3081 #[cfg(feature = "e2e-encryption")]
3082 self.inner.e2ee.encryption_settings,
3083 #[cfg(feature = "e2e-encryption")]
3084 self.inner.enable_share_history_on_invite,
3085 cross_process_lock_config,
3086 #[cfg(feature = "experimental-search")]
3087 self.inner.search_index.clone(),
3088 self.inner.thread_subscription_catchup.clone(),
3089 )
3090 .await,
3091 };
3092
3093 Ok(client)
3094 }
3095
3096 /// The [`EventCache`] instance for this [`Client`].
3097 pub fn event_cache(&self) -> &EventCache {
3098 // SAFETY: always initialized in the `Client` ctor.
3099 self.inner.event_cache.get().unwrap()
3100 }
3101
3102 /// The [`LatestEvents`] instance for this [`Client`].
3103 pub async fn latest_events(&self) -> &LatestEvents {
3104 self.inner
3105 .latest_events
3106 .get_or_init(|| async {
3107 LatestEvents::new(
3108 WeakClient::from_client(self),
3109 self.event_cache().clone(),
3110 SendQueue::new(self.clone()),
3111 self.room_info_notable_update_receiver(),
3112 )
3113 })
3114 .await
3115 }
3116
3117 /// Waits until an at least partially synced room is received, and returns
3118 /// it.
3119 ///
3120 /// **Note: this function will loop endlessly until either it finds the room
3121 /// or an externally set timeout happens.**
3122 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3123 loop {
3124 if let Some(room) = self.get_room(room_id) {
3125 if room.is_state_partially_or_fully_synced() {
3126 debug!("Found just created room!");
3127 return room;
3128 }
3129 debug!("Room wasn't partially synced, waiting for sync beat to try again");
3130 } else {
3131 debug!("Room wasn't found, waiting for sync beat to try again");
3132 }
3133 self.inner.sync_beat.listen().await;
3134 }
3135 }
3136
3137 /// Knock on a room given its `room_id_or_alias` to ask for permission to
3138 /// join it.
3139 pub async fn knock(
3140 &self,
3141 room_id_or_alias: OwnedRoomOrAliasId,
3142 reason: Option<String>,
3143 server_names: Vec<OwnedServerName>,
3144 ) -> Result<Room> {
3145 let request =
3146 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3147 let response = self.send(request).await?;
3148 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3149 Ok(Room::new(self.clone(), base_room))
3150 }
3151
3152 /// Checks whether the provided `user_id` belongs to an ignored user.
3153 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3154 self.base_client().is_user_ignored(user_id).await
3155 }
3156
3157 /// Gets the `max_upload_size` value from the homeserver, getting either a
3158 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3159 /// missing.
3160 ///
3161 /// Check the spec for more info:
3162 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3163 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3164 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3165 if let Some(data) = max_upload_size_lock.get() {
3166 return Ok(data.to_owned());
3167 }
3168
3169 // Use the authenticated endpoint when the server supports it.
3170 let supported_versions = self.supported_versions().await?;
3171 let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3172 .is_supported(&supported_versions);
3173
3174 let upload_size = if use_auth {
3175 self.send(authenticated_media::get_media_config::v1::Request::default())
3176 .await?
3177 .upload_size
3178 } else {
3179 #[allow(deprecated)]
3180 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3181 };
3182
3183 match max_upload_size_lock.set(upload_size) {
3184 Ok(_) => Ok(upload_size),
3185 Err(error) => {
3186 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3187 }
3188 }
3189 }
3190
3191 /// The settings to use for decrypting events.
3192 #[cfg(feature = "e2e-encryption")]
3193 pub fn decryption_settings(&self) -> &DecryptionSettings {
3194 &self.base_client().decryption_settings
3195 }
3196
3197 /// Returns the [`SearchIndex`] for this [`Client`].
3198 #[cfg(feature = "experimental-search")]
3199 pub fn search_index(&self) -> &SearchIndex {
3200 &self.inner.search_index
3201 }
3202
3203 /// Whether the client is configured to take thread subscriptions (MSC4306
3204 /// and MSC4308) into account, and the server enabled the experimental
3205 /// feature flag for it.
3206 ///
3207 /// This may cause filtering out of thread subscriptions, and loading the
3208 /// thread subscriptions via the sliding sync extension, when the room
3209 /// list service is being used.
3210 ///
3211 /// This is async and fallible as it may use the network to retrieve the
3212 /// server supported features, if they aren't cached already.
3213 pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3214 // Check if the client is configured to support thread subscriptions first.
3215 match self.base_client().threading_support {
3216 ThreadingSupport::Enabled { with_subscriptions: false }
3217 | ThreadingSupport::Disabled => return Ok(false),
3218 ThreadingSupport::Enabled { with_subscriptions: true } => {}
3219 }
3220
3221 // Now, let's check that the server supports it.
3222 let server_enabled = self
3223 .supported_versions()
3224 .await?
3225 .features
3226 .contains(&FeatureFlag::from("org.matrix.msc4306"));
3227
3228 Ok(server_enabled)
3229 }
3230
3231 /// Fetch thread subscriptions changes between `from` and up to `to`.
3232 ///
3233 /// The `limit` optional parameter can be used to limit the number of
3234 /// entries in a response. It can also be overridden by the server, if
3235 /// it's deemed too large.
3236 pub async fn fetch_thread_subscriptions(
3237 &self,
3238 from: Option<String>,
3239 to: Option<String>,
3240 limit: Option<UInt>,
3241 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3242 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3243 from,
3244 to,
3245 limit,
3246 });
3247 Ok(self.send(request).await?)
3248 }
3249
3250 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3251 self.inner.thread_subscription_catchup.get().unwrap()
3252 }
3253
3254 /// Perform database optimizations if any are available, i.e. vacuuming in
3255 /// SQLite.
3256 ///
3257 /// **Warning:** this was added to check if SQLite fragmentation was the
3258 /// source of performance issues, **DO NOT use in production**.
3259 #[doc(hidden)]
3260 pub async fn optimize_stores(&self) -> Result<()> {
3261 trace!("Optimizing state store...");
3262 self.state_store().optimize().await?;
3263
3264 trace!("Optimizing event cache store...");
3265 if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3266 clean_lock.optimize().await?;
3267 }
3268
3269 trace!("Optimizing media store...");
3270 self.media_store().lock().await?.optimize().await?;
3271
3272 Ok(())
3273 }
3274
3275 /// Returns the sizes of the existing stores, if known.
3276 pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3277 #[cfg(feature = "e2e-encryption")]
3278 let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3279 && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3280 {
3281 Some(store_size)
3282 } else {
3283 None
3284 };
3285 #[cfg(not(feature = "e2e-encryption"))]
3286 let crypto_store_size = None;
3287
3288 let state_store_size = self.state_store().get_size().await.ok().flatten();
3289
3290 let event_cache_store_size = if let Some(clean_lock) =
3291 self.event_cache_store().lock().await?.as_clean()
3292 && let Ok(Some(store_size)) = clean_lock.get_size().await
3293 {
3294 Some(store_size)
3295 } else {
3296 None
3297 };
3298
3299 let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3300
3301 Ok(StoreSizes {
3302 crypto_store: crypto_store_size,
3303 state_store: state_store_size,
3304 event_cache_store: event_cache_store_size,
3305 media_store: media_store_size,
3306 })
3307 }
3308
3309 /// Get a reference to the client's task monitor, for spawning background
3310 /// tasks.
3311 pub fn task_monitor(&self) -> &TaskMonitor {
3312 &self.inner.task_monitor
3313 }
3314
3315 /// Add a subscriber for duplicate key upload error notifications triggered
3316 /// by requests to /keys/upload.
3317 #[cfg(feature = "e2e-encryption")]
3318 pub fn subscribe_to_duplicate_key_upload_errors(
3319 &self,
3320 ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3321 self.inner.duplicate_key_upload_error_sender.subscribe()
3322 }
3323
3324 /// Check the record of whether we are waiting for an [MSC4268] key bundle
3325 /// for the given room.
3326 ///
3327 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3328 #[cfg(feature = "e2e-encryption")]
3329 pub async fn get_pending_key_bundle_details_for_room(
3330 &self,
3331 room_id: &RoomId,
3332 ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3333 Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3334 }
3335}
3336
3337/// Contains the disk size of the different stores, if known. It won't be
3338/// available for in-memory stores.
3339#[derive(Debug, Clone)]
3340pub struct StoreSizes {
3341 /// The size of the CryptoStore.
3342 pub crypto_store: Option<usize>,
3343 /// The size of the StateStore.
3344 pub state_store: Option<usize>,
3345 /// The size of the EventCacheStore.
3346 pub event_cache_store: Option<usize>,
3347 /// The size of the MediaStore.
3348 pub media_store: Option<usize>,
3349}
3350
3351#[cfg(any(feature = "testing", test))]
3352impl Client {
3353 /// Test helper to mark users as tracked by the crypto layer.
3354 #[cfg(feature = "e2e-encryption")]
3355 pub async fn update_tracked_users_for_testing(
3356 &self,
3357 user_ids: impl IntoIterator<Item = &UserId>,
3358 ) {
3359 let olm = self.olm_machine().await;
3360 let olm = olm.as_ref().unwrap();
3361 olm.update_tracked_users(user_ids).await.unwrap();
3362 }
3363}
3364
3365/// A weak reference to the inner client, useful when trying to get a handle
3366/// on the owning client.
3367#[derive(Clone, Debug)]
3368pub(crate) struct WeakClient {
3369 client: Weak<ClientInner>,
3370}
3371
3372impl WeakClient {
3373 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3374 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3375 Self { client: Arc::downgrade(client) }
3376 }
3377
3378 /// Construct a [`WeakClient`] from a [`Client`].
3379 pub fn from_client(client: &Client) -> Self {
3380 Self::from_inner(&client.inner)
3381 }
3382
3383 /// Attempts to get a [`Client`] from this [`WeakClient`].
3384 pub fn get(&self) -> Option<Client> {
3385 self.client.upgrade().map(|inner| Client { inner })
3386 }
3387
3388 /// Gets the number of strong (`Arc`) pointers still pointing to this
3389 /// client.
3390 #[allow(dead_code)]
3391 pub fn strong_count(&self) -> usize {
3392 self.client.strong_count()
3393 }
3394}
3395
3396/// Information about the state of a room before we joined it.
3397#[derive(Debug, Clone, Default)]
3398struct PreJoinRoomInfo {
3399 /// The user who invited us to the room, if any.
3400 pub inviter: Option<RoomMember>,
3401}
3402
3403// The http mocking library is not supported for wasm32
3404#[cfg(all(test, not(target_family = "wasm")))]
3405pub(crate) mod tests {
3406 use std::{sync::Arc, time::Duration};
3407
3408 use assert_matches::assert_matches;
3409 use assert_matches2::assert_let;
3410 use eyeball::SharedObservable;
3411 use futures_util::{FutureExt, StreamExt, pin_mut};
3412 use js_int::{UInt, uint};
3413 use matrix_sdk_base::{
3414 RoomState,
3415 store::{MemoryStore, StoreConfig},
3416 };
3417 use matrix_sdk_test::{
3418 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3419 event_factory::EventFactory,
3420 };
3421 #[cfg(target_family = "wasm")]
3422 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3423
3424 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3425 use ruma::{
3426 RoomId, ServerName, UserId,
3427 api::{
3428 FeatureFlag, MatrixVersion,
3429 client::{
3430 discovery::discover_homeserver::RtcFocusInfo,
3431 room::create_room::v3::Request as CreateRoomRequest,
3432 },
3433 },
3434 assign,
3435 events::{
3436 ignored_user_list::IgnoredUserListEventContent,
3437 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3438 },
3439 owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3440 };
3441 use serde_json::json;
3442 use stream_assert::{assert_next_matches, assert_pending};
3443 use tokio::{
3444 spawn,
3445 time::{sleep, timeout},
3446 };
3447 use url::Url;
3448
3449 use super::Client;
3450 use crate::{
3451 Error, TransmissionProgress,
3452 client::{WeakClient, futures::SendMediaUploadRequest},
3453 config::{RequestConfig, SyncSettings},
3454 futures::SendRequest,
3455 media::MediaError,
3456 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3457 };
3458
3459 #[async_test]
3460 async fn test_account_data() {
3461 let server = MatrixMockServer::new().await;
3462 let client = server.client_builder().build().await;
3463
3464 let f = EventFactory::new();
3465 server
3466 .mock_sync()
3467 .ok_and_run(&client, |builder| {
3468 builder.add_global_account_data(
3469 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3470 );
3471 })
3472 .await;
3473
3474 let content = client
3475 .account()
3476 .account_data::<IgnoredUserListEventContent>()
3477 .await
3478 .unwrap()
3479 .unwrap()
3480 .deserialize()
3481 .unwrap();
3482
3483 assert_eq!(content.ignored_users.len(), 1);
3484 }
3485
3486 #[async_test]
3487 async fn test_successful_discovery() {
3488 // Imagine this is `matrix.org`.
3489 let server = MatrixMockServer::new().await;
3490 let server_url = server.uri();
3491
3492 // Imagine this is `matrix-client.matrix.org`.
3493 let homeserver = MatrixMockServer::new().await;
3494 let homeserver_url = homeserver.uri();
3495
3496 // Imagine Alice has the user ID `@alice:matrix.org`.
3497 let domain = server_url.strip_prefix("http://").unwrap();
3498 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3499
3500 // The `.well-known` is on the server (e.g. `matrix.org`).
3501 server
3502 .mock_well_known()
3503 .ok_with_homeserver_url(&homeserver_url)
3504 .mock_once()
3505 .named("well-known")
3506 .mount()
3507 .await;
3508
3509 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3510 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3511
3512 let client = Client::builder()
3513 .insecure_server_name_no_tls(alice.server_name())
3514 .build()
3515 .await
3516 .unwrap();
3517
3518 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3519 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3520 client.server_versions().await.unwrap();
3521 }
3522
3523 #[async_test]
3524 async fn test_discovery_broken_server() {
3525 let server = MatrixMockServer::new().await;
3526 let server_url = server.uri();
3527 let domain = server_url.strip_prefix("http://").unwrap();
3528 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3529
3530 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3531
3532 assert!(
3533 Client::builder()
3534 .insecure_server_name_no_tls(alice.server_name())
3535 .build()
3536 .await
3537 .is_err(),
3538 "Creating a client from a user ID should fail when the .well-known request fails."
3539 );
3540 }
3541
3542 #[async_test]
3543 async fn test_room_creation() {
3544 let server = MatrixMockServer::new().await;
3545 let client = server.client_builder().build().await;
3546
3547 let f = EventFactory::new().sender(user_id!("@example:localhost"));
3548 server
3549 .mock_sync()
3550 .ok_and_run(&client, |builder| {
3551 builder.add_joined_room(
3552 JoinedRoomBuilder::default()
3553 .add_state_event(
3554 f.member(user_id!("@example:localhost")).display_name("example"),
3555 )
3556 .add_state_event(f.default_power_levels()),
3557 );
3558 })
3559 .await;
3560
3561 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3562 assert_eq!(room.state(), RoomState::Joined);
3563 }
3564
3565 #[async_test]
3566 async fn test_retry_limit_http_requests() {
3567 let server = MatrixMockServer::new().await;
3568 let client = server
3569 .client_builder()
3570 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3571 .build()
3572 .await;
3573
3574 assert!(client.request_config().retry_limit.unwrap() == 4);
3575
3576 server.mock_who_am_i().error500().expect(4).mount().await;
3577
3578 client.whoami().await.unwrap_err();
3579 }
3580
3581 #[async_test]
3582 async fn test_retry_timeout_http_requests() {
3583 // Keep this timeout small so that the test doesn't take long
3584 let retry_timeout = Duration::from_secs(5);
3585 let server = MatrixMockServer::new().await;
3586 let client = server
3587 .client_builder()
3588 .on_builder(|builder| {
3589 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3590 })
3591 .build()
3592 .await;
3593
3594 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3595
3596 server.mock_login().error500().expect(2..).mount().await;
3597
3598 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3599 }
3600
3601 #[async_test]
3602 async fn test_short_retry_initial_http_requests() {
3603 let server = MatrixMockServer::new().await;
3604 let client = server
3605 .client_builder()
3606 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3607 .build()
3608 .await;
3609
3610 server.mock_login().error500().expect(3..).mount().await;
3611
3612 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3613 }
3614
3615 #[async_test]
3616 async fn test_no_retry_http_requests() {
3617 let server = MatrixMockServer::new().await;
3618 let client = server.client_builder().build().await;
3619
3620 server.mock_devices().error500().mock_once().mount().await;
3621
3622 client.devices().await.unwrap_err();
3623 }
3624
3625 #[async_test]
3626 async fn test_set_homeserver() {
3627 let client = MockClientBuilder::new(None).build().await;
3628 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3629
3630 let homeserver = Url::parse("http://example.com/").unwrap();
3631 client.set_homeserver(homeserver.clone());
3632 assert_eq!(client.homeserver(), homeserver);
3633 }
3634
3635 #[async_test]
3636 async fn test_search_user_request() {
3637 let server = MatrixMockServer::new().await;
3638 let client = server.client_builder().build().await;
3639
3640 server.mock_user_directory().ok().mock_once().mount().await;
3641
3642 let response = client.search_users("test", 50).await.unwrap();
3643 assert_eq!(response.results.len(), 1);
3644 let result = &response.results[0];
3645 assert_eq!(result.user_id.to_string(), "@test:example.me");
3646 assert_eq!(result.display_name.clone().unwrap(), "Test");
3647 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3648 assert!(!response.limited);
3649 }
3650
3651 #[async_test]
3652 async fn test_request_unstable_features() {
3653 let server = MatrixMockServer::new().await;
3654 let client = server.client_builder().no_server_versions().build().await;
3655
3656 server
3657 .mock_versions()
3658 .with_feature("org.matrix.e2e_cross_signing", true)
3659 .ok()
3660 .mock_once()
3661 .mount()
3662 .await;
3663
3664 let unstable_features = client.unstable_features().await.unwrap();
3665 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3666 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3667 }
3668
3669 #[async_test]
3670 async fn test_can_homeserver_push_encrypted_event_to_device() {
3671 let server = MatrixMockServer::new().await;
3672 let client = server.client_builder().no_server_versions().build().await;
3673
3674 server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3675
3676 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3677 assert!(msc4028_enabled);
3678 }
3679
3680 #[async_test]
3681 async fn test_recently_visited_rooms() {
3682 // Tracking recently visited rooms requires authentication
3683 let client = MockClientBuilder::new(None).unlogged().build().await;
3684 assert_matches!(
3685 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3686 Err(Error::AuthenticationRequired)
3687 );
3688
3689 let client = MockClientBuilder::new(None).build().await;
3690 let account = client.account();
3691
3692 // We should start off with an empty list
3693 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3694
3695 // Tracking a valid room id should add it to the list
3696 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3697 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3698 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3699
3700 // And the existing list shouldn't be changed
3701 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3702 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3703
3704 // Tracking the same room again shouldn't change the list
3705 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3706 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3707 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3708
3709 // Tracking a second room should add it to the front of the list
3710 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3711 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3712 assert_eq!(
3713 account.get_recently_visited_rooms().await.unwrap(),
3714 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3715 );
3716
3717 // Tracking the first room yet again should move it to the front of the list
3718 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3719 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3720 assert_eq!(
3721 account.get_recently_visited_rooms().await.unwrap(),
3722 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3723 );
3724
3725 // Tracking should be capped at 20
3726 for n in 0..20 {
3727 account
3728 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3729 .await
3730 .unwrap();
3731 }
3732
3733 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3734
3735 // And the initial rooms should've been pushed out
3736 let rooms = account.get_recently_visited_rooms().await.unwrap();
3737 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3738 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3739
3740 // And the last tracked room should be the first
3741 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3742 }
3743
3744 #[async_test]
3745 async fn test_client_no_cycle_with_event_cache() {
3746 let client = MockClientBuilder::new(None).build().await;
3747
3748 // Wait for the init tasks to die.
3749 sleep(Duration::from_secs(1)).await;
3750
3751 let weak_client = WeakClient::from_client(&client);
3752 assert_eq!(weak_client.strong_count(), 1);
3753
3754 {
3755 let room_id = room_id!("!room:example.org");
3756
3757 // Have the client know the room.
3758 let response = SyncResponseBuilder::default()
3759 .add_joined_room(JoinedRoomBuilder::new(room_id))
3760 .build_sync_response();
3761 client.inner.base_client.receive_sync_response(response).await.unwrap();
3762
3763 client.event_cache().subscribe().unwrap();
3764
3765 let (_room_event_cache, _drop_handles) =
3766 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3767 }
3768
3769 drop(client);
3770
3771 // Give a bit of time for background tasks to die.
3772 sleep(Duration::from_secs(1)).await;
3773
3774 // The weak client must be the last reference to the client now.
3775 assert_eq!(weak_client.strong_count(), 0);
3776 let client = weak_client.get();
3777 assert!(
3778 client.is_none(),
3779 "too many strong references to the client: {}",
3780 Arc::strong_count(&client.unwrap().inner)
3781 );
3782 }
3783
3784 #[async_test]
3785 async fn test_supported_versions_caching() {
3786 let server = MatrixMockServer::new().await;
3787
3788 let versions_mock = server
3789 .mock_versions()
3790 .expect_default_access_token()
3791 .with_feature("org.matrix.e2e_cross_signing", true)
3792 .ok()
3793 .named("first versions mock")
3794 .expect(1)
3795 .mount_as_scoped()
3796 .await;
3797
3798 let memory_store = Arc::new(MemoryStore::new());
3799 let client = server
3800 .client_builder()
3801 .no_server_versions()
3802 .on_builder(|builder| {
3803 builder.store_config(
3804 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3805 .state_store(memory_store.clone()),
3806 )
3807 })
3808 .build()
3809 .await;
3810
3811 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3812
3813 // The result was cached.
3814 assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3815 // This subsequent call hits the in-memory cache.
3816 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3817
3818 drop(client);
3819
3820 let client = server
3821 .client_builder()
3822 .no_server_versions()
3823 .on_builder(|builder| {
3824 builder.store_config(
3825 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3826 .state_store(memory_store.clone()),
3827 )
3828 })
3829 .build()
3830 .await;
3831
3832 // These calls to the new client hit the on-disk cache.
3833 assert!(
3834 client
3835 .unstable_features()
3836 .await
3837 .unwrap()
3838 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3839 );
3840 let supported = client.supported_versions().await.unwrap();
3841 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3842 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3843
3844 // Then this call hits the in-memory cache.
3845 let supported = client.supported_versions().await.unwrap();
3846 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3847 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3848
3849 drop(versions_mock);
3850
3851 // Now, reset the cache, and observe the endpoints being called again once.
3852 client.reset_supported_versions().await.unwrap();
3853
3854 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3855
3856 // Hits network again.
3857 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3858 // Hits in-memory cache again.
3859 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3860 }
3861
3862 #[async_test]
3863 async fn test_well_known_caching() {
3864 let server = MatrixMockServer::new().await;
3865 let server_url = server.uri();
3866 let domain = server_url.strip_prefix("http://").unwrap();
3867 let server_name = <&ServerName>::try_from(domain).unwrap();
3868 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3869
3870 let well_known_mock = server
3871 .mock_well_known()
3872 .ok()
3873 .named("well known mock")
3874 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3875 .mount_as_scoped()
3876 .await;
3877
3878 let memory_store = Arc::new(MemoryStore::new());
3879 let client = Client::builder()
3880 .insecure_server_name_no_tls(server_name)
3881 .store_config(
3882 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3883 .state_store(memory_store.clone()),
3884 )
3885 .build()
3886 .await
3887 .unwrap();
3888
3889 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3890
3891 // This subsequent call hits the in-memory cache.
3892 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3893
3894 drop(client);
3895
3896 let client = server
3897 .client_builder()
3898 .no_server_versions()
3899 .on_builder(|builder| {
3900 builder.store_config(
3901 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3902 .state_store(memory_store.clone()),
3903 )
3904 })
3905 .build()
3906 .await;
3907
3908 // This call to the new client hits the on-disk cache.
3909 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3910
3911 // Then this call hits the in-memory cache.
3912 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3913
3914 drop(well_known_mock);
3915
3916 // Now, reset the cache, and observe the endpoints being called again once.
3917 client.reset_well_known().await.unwrap();
3918
3919 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3920
3921 // Hits network again.
3922 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3923 // Hits in-memory cache again.
3924 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3925 }
3926
3927 #[async_test]
3928 async fn test_missing_well_known_caching() {
3929 let server = MatrixMockServer::new().await;
3930 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3931
3932 let well_known_mock = server
3933 .mock_well_known()
3934 .error_unrecognized()
3935 .named("first well-known mock")
3936 .expect(1)
3937 .mount_as_scoped()
3938 .await;
3939
3940 let memory_store = Arc::new(MemoryStore::new());
3941 let client = server
3942 .client_builder()
3943 .on_builder(|builder| {
3944 builder.store_config(
3945 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3946 .state_store(memory_store.clone()),
3947 )
3948 })
3949 .build()
3950 .await;
3951
3952 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3953
3954 // This subsequent call hits the in-memory cache.
3955 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3956
3957 drop(client);
3958
3959 let client = server
3960 .client_builder()
3961 .on_builder(|builder| {
3962 builder.store_config(
3963 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3964 .state_store(memory_store.clone()),
3965 )
3966 })
3967 .build()
3968 .await;
3969
3970 // This call to the new client hits the on-disk cache.
3971 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3972
3973 // Then this call hits the in-memory cache.
3974 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3975
3976 drop(well_known_mock);
3977
3978 // Now, reset the cache, and observe the endpoints being called again once.
3979 client.reset_well_known().await.unwrap();
3980
3981 server
3982 .mock_well_known()
3983 .error_unrecognized()
3984 .expect(1)
3985 .named("second well-known mock")
3986 .mount()
3987 .await;
3988
3989 // Hits network again.
3990 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3991 // Hits in-memory cache again.
3992 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3993 }
3994
3995 #[async_test]
3996 async fn test_no_network_doesnt_cause_infinite_retries() {
3997 // We want infinite retries for transient errors.
3998 let client = MockClientBuilder::new(None)
3999 .on_builder(|builder| builder.request_config(RequestConfig::new()))
4000 .build()
4001 .await;
4002
4003 // We don't define a mock server on purpose here, so that the error is really a
4004 // network error.
4005 client.whoami().await.unwrap_err();
4006 }
4007
4008 #[async_test]
4009 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4010 let server = MatrixMockServer::new().await;
4011 let client = server.client_builder().build().await;
4012
4013 let room_id = room_id!("!room:example.org");
4014
4015 server
4016 .mock_sync()
4017 .ok_and_run(&client, |builder| {
4018 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4019 })
4020 .await;
4021
4022 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4023 assert_eq!(room.room_id(), room_id);
4024 }
4025
4026 #[async_test]
4027 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4028 let server = MatrixMockServer::new().await;
4029 let client = server.client_builder().build().await;
4030
4031 let room_id = room_id!("!room:example.org");
4032
4033 let client = Arc::new(client);
4034
4035 // Perform the /sync request with a delay so it starts after the
4036 // `await_room_remote_echo` call has happened
4037 spawn({
4038 let client = client.clone();
4039 async move {
4040 sleep(Duration::from_millis(100)).await;
4041
4042 server
4043 .mock_sync()
4044 .ok_and_run(&client, |builder| {
4045 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4046 })
4047 .await;
4048 }
4049 });
4050
4051 let room =
4052 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4053 assert_eq!(room.room_id(), room_id);
4054 }
4055
4056 #[async_test]
4057 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4058 let client = MockClientBuilder::new(None).build().await;
4059
4060 let room_id = room_id!("!room:example.org");
4061 // Room is not present so the client won't be able to find it. The call will
4062 // timeout.
4063 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4064 }
4065
4066 #[async_test]
4067 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4068 let server = MatrixMockServer::new().await;
4069 let client = server.client_builder().build().await;
4070
4071 server.mock_create_room().ok().mount().await;
4072
4073 // Create a room in the internal store
4074 let room = client
4075 .create_room(assign!(CreateRoomRequest::new(), {
4076 invite: vec![],
4077 is_direct: false,
4078 }))
4079 .await
4080 .unwrap();
4081
4082 // Room is locally present, but not synced, the call will timeout
4083 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4084 .await
4085 .unwrap_err();
4086 }
4087
4088 #[async_test]
4089 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4090 let server = MatrixMockServer::new().await;
4091 let client = server.client_builder().build().await;
4092
4093 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4094
4095 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4096 assert_matches!(ret, Ok(true));
4097 }
4098
4099 #[async_test]
4100 async fn test_is_room_alias_available_if_alias_is_resolved() {
4101 let server = MatrixMockServer::new().await;
4102 let client = server.client_builder().build().await;
4103
4104 server
4105 .mock_room_directory_resolve_alias()
4106 .ok("!some_room_id:matrix.org", Vec::new())
4107 .expect(1)
4108 .mount()
4109 .await;
4110
4111 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4112 assert_matches!(ret, Ok(false));
4113 }
4114
4115 #[async_test]
4116 async fn test_is_room_alias_available_if_error_found() {
4117 let server = MatrixMockServer::new().await;
4118 let client = server.client_builder().build().await;
4119
4120 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4121
4122 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4123 assert_matches!(ret, Err(_));
4124 }
4125
4126 #[async_test]
4127 async fn test_create_room_alias() {
4128 let server = MatrixMockServer::new().await;
4129 let client = server.client_builder().build().await;
4130
4131 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4132
4133 let ret = client
4134 .create_room_alias(
4135 room_alias_id!("#some_alias:matrix.org"),
4136 room_id!("!some_room:matrix.org"),
4137 )
4138 .await;
4139 assert_matches!(ret, Ok(()));
4140 }
4141
4142 #[async_test]
4143 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4144 let server = MatrixMockServer::new().await;
4145 let client = server.client_builder().build().await;
4146
4147 let room_id = room_id!("!a-room:matrix.org");
4148
4149 // Make sure the summary endpoint is called once
4150 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4151
4152 // We create a locally cached invited room
4153 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4154
4155 // And we get a preview, the server endpoint was reached
4156 let preview = client
4157 .get_room_preview(room_id.into(), Vec::new())
4158 .await
4159 .expect("Room preview should be retrieved");
4160
4161 assert_eq!(invited_room.room_id(), preview.room_id);
4162 }
4163
4164 #[async_test]
4165 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4166 let server = MatrixMockServer::new().await;
4167 let client = server.client_builder().build().await;
4168
4169 let room_id = room_id!("!a-room:matrix.org");
4170
4171 // Make sure the summary endpoint is called once
4172 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4173
4174 // We create a locally cached left room
4175 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4176
4177 // And we get a preview, the server endpoint was reached
4178 let preview = client
4179 .get_room_preview(room_id.into(), Vec::new())
4180 .await
4181 .expect("Room preview should be retrieved");
4182
4183 assert_eq!(left_room.room_id(), preview.room_id);
4184 }
4185
4186 #[async_test]
4187 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4188 let server = MatrixMockServer::new().await;
4189 let client = server.client_builder().build().await;
4190
4191 let room_id = room_id!("!a-room:matrix.org");
4192
4193 // Make sure the summary endpoint is called once
4194 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4195
4196 // We create a locally cached knocked room
4197 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4198
4199 // And we get a preview, the server endpoint was reached
4200 let preview = client
4201 .get_room_preview(room_id.into(), Vec::new())
4202 .await
4203 .expect("Room preview should be retrieved");
4204
4205 assert_eq!(knocked_room.room_id(), preview.room_id);
4206 }
4207
4208 #[async_test]
4209 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4210 let server = MatrixMockServer::new().await;
4211 let client = server.client_builder().build().await;
4212
4213 let room_id = room_id!("!a-room:matrix.org");
4214
4215 // Make sure the summary endpoint is not called
4216 server.mock_room_summary().ok(room_id).never().mount().await;
4217
4218 // We create a locally cached joined room
4219 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4220
4221 // And we get a preview, no server endpoint was reached
4222 let preview = client
4223 .get_room_preview(room_id.into(), Vec::new())
4224 .await
4225 .expect("Room preview should be retrieved");
4226
4227 assert_eq!(joined_room.room_id(), preview.room_id);
4228 }
4229
4230 #[async_test]
4231 async fn test_media_preview_config() {
4232 let server = MatrixMockServer::new().await;
4233 let client = server.client_builder().build().await;
4234
4235 server
4236 .mock_sync()
4237 .ok_and_run(&client, |builder| {
4238 builder.add_custom_global_account_data(json!({
4239 "content": {
4240 "media_previews": "private",
4241 "invite_avatars": "off"
4242 },
4243 "type": "m.media_preview_config"
4244 }));
4245 })
4246 .await;
4247
4248 let (initial_value, stream) =
4249 client.account().observe_media_preview_config().await.unwrap();
4250
4251 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4252 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4253 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4254 pin_mut!(stream);
4255 assert_pending!(stream);
4256
4257 server
4258 .mock_sync()
4259 .ok_and_run(&client, |builder| {
4260 builder.add_custom_global_account_data(json!({
4261 "content": {
4262 "media_previews": "off",
4263 "invite_avatars": "on"
4264 },
4265 "type": "m.media_preview_config"
4266 }));
4267 })
4268 .await;
4269
4270 assert_next_matches!(
4271 stream,
4272 MediaPreviewConfigEventContent {
4273 media_previews: Some(MediaPreviews::Off),
4274 invite_avatars: Some(InviteAvatars::On),
4275 ..
4276 }
4277 );
4278 assert_pending!(stream);
4279 }
4280
4281 #[async_test]
4282 async fn test_unstable_media_preview_config() {
4283 let server = MatrixMockServer::new().await;
4284 let client = server.client_builder().build().await;
4285
4286 server
4287 .mock_sync()
4288 .ok_and_run(&client, |builder| {
4289 builder.add_custom_global_account_data(json!({
4290 "content": {
4291 "media_previews": "private",
4292 "invite_avatars": "off"
4293 },
4294 "type": "io.element.msc4278.media_preview_config"
4295 }));
4296 })
4297 .await;
4298
4299 let (initial_value, stream) =
4300 client.account().observe_media_preview_config().await.unwrap();
4301
4302 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4303 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4304 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4305 pin_mut!(stream);
4306 assert_pending!(stream);
4307
4308 server
4309 .mock_sync()
4310 .ok_and_run(&client, |builder| {
4311 builder.add_custom_global_account_data(json!({
4312 "content": {
4313 "media_previews": "off",
4314 "invite_avatars": "on"
4315 },
4316 "type": "io.element.msc4278.media_preview_config"
4317 }));
4318 })
4319 .await;
4320
4321 assert_next_matches!(
4322 stream,
4323 MediaPreviewConfigEventContent {
4324 media_previews: Some(MediaPreviews::Off),
4325 invite_avatars: Some(InviteAvatars::On),
4326 ..
4327 }
4328 );
4329 assert_pending!(stream);
4330 }
4331
4332 #[async_test]
4333 async fn test_media_preview_config_not_found() {
4334 let server = MatrixMockServer::new().await;
4335 let client = server.client_builder().build().await;
4336
4337 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4338
4339 assert!(initial_value.is_none());
4340 }
4341
4342 #[async_test]
4343 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4344 // The default Matrix version we use is 1.11 or higher, so authenticated media
4345 // is supported.
4346 let server = MatrixMockServer::new().await;
4347 let client = server.client_builder().build().await;
4348
4349 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4350
4351 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4352 client.load_or_fetch_max_upload_size().await.unwrap();
4353
4354 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4355 }
4356
4357 #[async_test]
4358 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4359 // The server must advertise support for the stable feature for authenticated
4360 // media support, so we mock the `GET /versions` response.
4361 let server = MatrixMockServer::new().await;
4362 let client = server.client_builder().no_server_versions().build().await;
4363
4364 server
4365 .mock_versions()
4366 .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4367 .with_feature("org.matrix.msc3916.stable", true)
4368 .ok()
4369 .named("versions")
4370 .expect(1)
4371 .mount()
4372 .await;
4373
4374 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4375
4376 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4377 client.load_or_fetch_max_upload_size().await.unwrap();
4378
4379 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4380 }
4381
4382 #[async_test]
4383 async fn test_load_or_fetch_max_upload_size_no_auth() {
4384 // The server must not support Matrix 1.11 or higher for unauthenticated
4385 // media requests, so we mock the `GET /versions` response.
4386 let server = MatrixMockServer::new().await;
4387 let client = server.client_builder().no_server_versions().build().await;
4388
4389 server
4390 .mock_versions()
4391 .with_versions(vec!["v1.1"])
4392 .ok()
4393 .named("versions")
4394 .expect(1)
4395 .mount()
4396 .await;
4397
4398 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4399
4400 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4401 client.load_or_fetch_max_upload_size().await.unwrap();
4402
4403 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4404 }
4405
4406 #[async_test]
4407 async fn test_uploading_a_too_large_media_file() {
4408 let server = MatrixMockServer::new().await;
4409 let client = server.client_builder().build().await;
4410
4411 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4412 client.load_or_fetch_max_upload_size().await.unwrap();
4413 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4414
4415 let data = vec![1, 2];
4416 let upload_request =
4417 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4418 let request = SendRequest {
4419 client: client.clone(),
4420 request: upload_request,
4421 config: None,
4422 send_progress: SharedObservable::new(TransmissionProgress::default()),
4423 };
4424 let media_request = SendMediaUploadRequest::new(request);
4425
4426 let error = media_request.await.err();
4427 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4428 assert_eq!(max, uint!(1));
4429 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4430 }
4431
4432 #[async_test]
4433 async fn test_dont_ignore_timeout_on_first_sync() {
4434 let server = MatrixMockServer::new().await;
4435 let client = server.client_builder().build().await;
4436
4437 server
4438 .mock_sync()
4439 .timeout(Some(Duration::from_secs(30)))
4440 .ok(|_| {})
4441 .mock_once()
4442 .named("sync_with_timeout")
4443 .mount()
4444 .await;
4445
4446 // Call the endpoint once to check the timeout.
4447 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4448
4449 timeout(Duration::from_secs(1), async {
4450 stream.next().await.unwrap().unwrap();
4451 })
4452 .await
4453 .unwrap();
4454 }
4455
4456 #[async_test]
4457 async fn test_ignore_timeout_on_first_sync() {
4458 let server = MatrixMockServer::new().await;
4459 let client = server.client_builder().build().await;
4460
4461 server
4462 .mock_sync()
4463 .timeout(None)
4464 .ok(|_| {})
4465 .mock_once()
4466 .named("sync_no_timeout")
4467 .mount()
4468 .await;
4469 server
4470 .mock_sync()
4471 .timeout(Some(Duration::from_secs(30)))
4472 .ok(|_| {})
4473 .mock_once()
4474 .named("sync_with_timeout")
4475 .mount()
4476 .await;
4477
4478 // Call each version of the endpoint once to check the timeouts.
4479 let mut stream = Box::pin(
4480 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4481 );
4482
4483 timeout(Duration::from_secs(1), async {
4484 stream.next().await.unwrap().unwrap();
4485 stream.next().await.unwrap().unwrap();
4486 })
4487 .await
4488 .unwrap();
4489 }
4490
4491 #[async_test]
4492 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4493 let server = MatrixMockServer::new().await;
4494 let client = server.client_builder().build().await;
4495 // This is the user ID that is inside MemberAdditional.
4496 // Note the confusing username, so we can share
4497 // GlobalAccountDataTestEvent::Direct with the invited test.
4498 let user_id = user_id!("@invited:localhost");
4499
4500 // When we receive a sync response saying "invited" is invited to a DM
4501 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4502 let response = SyncResponseBuilder::default()
4503 .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4504 .add_global_account_data(
4505 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4506 )
4507 .build_sync_response();
4508 client.base_client().receive_sync_response(response).await.unwrap();
4509
4510 // Then get_dm_room finds this room
4511 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4512 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4513 }
4514
4515 #[async_test]
4516 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4517 let server = MatrixMockServer::new().await;
4518 let client = server.client_builder().build().await;
4519 // This is the user ID that is inside MemberInvite
4520 let user_id = user_id!("@invited:localhost");
4521
4522 // When we receive a sync response saying "invited" is invited to a DM
4523 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4524 let response = SyncResponseBuilder::default()
4525 .add_joined_room(
4526 JoinedRoomBuilder::default()
4527 .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4528 )
4529 .add_global_account_data(
4530 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4531 )
4532 .build_sync_response();
4533 client.base_client().receive_sync_response(response).await.unwrap();
4534
4535 // Then get_dm_room finds this room
4536 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4537 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4538 }
4539
4540 #[async_test]
4541 async fn test_get_dm_room_still_finds_left_room() {
4542 // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4543 // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4544
4545 let server = MatrixMockServer::new().await;
4546 let client = server.client_builder().build().await;
4547 // This is the user ID that is inside MemberAdditional.
4548 // Note the confusing username, so we can share
4549 // GlobalAccountDataTestEvent::Direct with the invited test.
4550 let user_id = user_id!("@invited:localhost");
4551
4552 // When we receive a sync response saying "invited" has left a DM
4553 let f = EventFactory::new().sender(user_id);
4554 let response = SyncResponseBuilder::default()
4555 .add_joined_room(
4556 JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4557 )
4558 .add_global_account_data(
4559 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4560 )
4561 .build_sync_response();
4562 client.base_client().receive_sync_response(response).await.unwrap();
4563
4564 // Then get_dm_room finds this room
4565 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4566 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4567 }
4568
4569 #[async_test]
4570 async fn test_device_exists() {
4571 let server = MatrixMockServer::new().await;
4572 let client = server.client_builder().build().await;
4573
4574 server.mock_get_device().ok().expect(1).mount().await;
4575
4576 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4577 }
4578
4579 #[async_test]
4580 async fn test_device_exists_404() {
4581 let server = MatrixMockServer::new().await;
4582 let client = server.client_builder().build().await;
4583
4584 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4585 }
4586
4587 #[async_test]
4588 async fn test_device_exists_500() {
4589 let server = MatrixMockServer::new().await;
4590 let client = server.client_builder().build().await;
4591
4592 server.mock_get_device().error500().expect(1).mount().await;
4593
4594 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4595 }
4596
4597 #[async_test]
4598 async fn test_fetching_well_known_with_homeserver_url() {
4599 let server = MatrixMockServer::new().await;
4600 let client = server.client_builder().build().await;
4601 server.mock_well_known().ok().mount().await;
4602
4603 assert_matches!(client.fetch_client_well_known().await, Some(_));
4604 }
4605
4606 #[async_test]
4607 async fn test_fetching_well_known_with_server_name() {
4608 let server = MatrixMockServer::new().await;
4609 let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4610
4611 server.mock_well_known().ok().mount().await;
4612
4613 let client = MockClientBuilder::new(None)
4614 .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4615 .build()
4616 .await;
4617
4618 assert_matches!(client.fetch_client_well_known().await, Some(_));
4619 }
4620
4621 #[async_test]
4622 async fn test_fetching_well_known_with_domain_part_of_user_id() {
4623 let server = MatrixMockServer::new().await;
4624 server.mock_well_known().ok().mount().await;
4625
4626 let user_id =
4627 UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4628 let client = MockClientBuilder::new(None)
4629 .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4630 .build()
4631 .await;
4632
4633 assert_matches!(client.fetch_client_well_known().await, Some(_));
4634 }
4635}