matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32 DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
36 StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
37 event_cache::store::EventCacheStoreLock,
38 media::store::MediaStoreLock,
39 store::{
40 DynStateStore, RoomLoadSettings, SupportedVersionsResponse, TtlStoreValue,
41 WellKnownResponse,
42 },
43 sync::{Notification, RoomUpdates},
44 task_monitor::TaskMonitor,
45};
46use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl_cache::TtlCache};
47#[cfg(feature = "e2e-encryption")]
48use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
49use ruma::{
50 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
51 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
52 api::{
53 FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
54 client::{
55 account::whoami,
56 alias::{create_alias, delete_alias, get_alias},
57 authenticated_media,
58 device::{self, delete_devices, get_devices, update_device},
59 directory::{get_public_rooms, get_public_rooms_filtered},
60 discovery::{
61 discover_homeserver::{self, RtcFocusInfo},
62 get_capabilities::{self, v3::Capabilities},
63 get_supported_versions,
64 },
65 error::{ErrorKind, UnknownTokenErrorData},
66 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
67 knock::knock_room,
68 media,
69 membership::{join_room_by_id, join_room_by_id_or_alias},
70 room::create_room,
71 session::login::v3::DiscoveryInfo,
72 sync::sync_events,
73 threads::get_thread_subscriptions_changes,
74 uiaa,
75 user_directory::search_users,
76 },
77 error::FromHttpResponseError,
78 path_builder::PathBuilder,
79 },
80 assign,
81 events::direct::DirectUserIdentifier,
82 push::Ruleset,
83 time::Instant,
84};
85use serde::de::DeserializeOwned;
86use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
87use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
88use url::Url;
89
90use self::{
91 caches::{CachedValue, ClientCaches},
92 futures::SendRequest,
93};
94use crate::{
95 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
96 Room, SessionTokens, TransmissionProgress,
97 authentication::{
98 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
99 oauth::OAuth,
100 },
101 client::thread_subscriptions::ThreadSubscriptionCatchup,
102 config::{RequestConfig, SyncToken},
103 deduplicating_handler::DeduplicatingHandler,
104 error::HttpResult,
105 event_cache::EventCache,
106 event_handler::{
107 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
108 EventHandlerStore, ObservableEventHandler, SyncEvent,
109 },
110 http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
111 latest_events::LatestEvents,
112 media::MediaError,
113 notification_settings::NotificationSettings,
114 room::RoomMember,
115 room_preview::RoomPreview,
116 send_queue::{SendQueue, SendQueueData},
117 sliding_sync::Version as SlidingSyncVersion,
118 sync::{RoomUpdate, SyncResponse},
119};
120#[cfg(feature = "e2e-encryption")]
121use crate::{
122 cross_process_lock::CrossProcessLock,
123 encryption::{
124 DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
125 VerificationState,
126 },
127};
128
129mod builder;
130pub(crate) mod caches;
131pub(crate) mod futures;
132pub(crate) mod thread_subscriptions;
133
134pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
135#[cfg(feature = "experimental-search")]
136use crate::search_index::SearchIndex;
137
138#[cfg(not(target_family = "wasm"))]
139type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
140#[cfg(target_family = "wasm")]
141type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
142
143#[cfg(not(target_family = "wasm"))]
144type NotificationHandlerFn =
145 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
146#[cfg(target_family = "wasm")]
147type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
148
149/// Enum controlling if a loop running callbacks should continue or abort.
150///
151/// This is mainly used in the [`sync_with_callback`] method, the return value
152/// of the provided callback controls if the sync loop should be exited.
153///
154/// [`sync_with_callback`]: #method.sync_with_callback
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub enum LoopCtrl {
157 /// Continue running the loop.
158 Continue,
159 /// Break out of the loop.
160 Break,
161}
162
163/// Represents changes that can occur to a `Client`s `Session`.
164#[derive(Debug, Clone, PartialEq)]
165pub enum SessionChange {
166 /// The session's token is no longer valid.
167 UnknownToken(UnknownTokenErrorData),
168 /// The session's tokens have been refreshed.
169 TokensRefreshed,
170}
171
172/// Information about the server vendor obtained from the federation API.
173#[derive(Debug, Clone, PartialEq, Eq)]
174#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
175pub struct ServerVendorInfo {
176 /// The server name.
177 pub server_name: String,
178 /// The server version.
179 pub version: String,
180}
181
182/// An async/await enabled Matrix client.
183///
184/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
185#[derive(Clone)]
186pub struct Client {
187 pub(crate) inner: Arc<ClientInner>,
188}
189
190#[derive(Default)]
191pub(crate) struct ClientLocks {
192 /// Lock ensuring that only a single room may be marked as a DM at once.
193 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
194 /// explanation.
195 pub(crate) mark_as_dm_lock: Mutex<()>,
196
197 /// Lock ensuring that only a single secret store is getting opened at the
198 /// same time.
199 ///
200 /// This is important so we don't accidentally create multiple different new
201 /// default secret storage keys.
202 #[cfg(feature = "e2e-encryption")]
203 pub(crate) open_secret_store_lock: Mutex<()>,
204
205 /// Lock ensuring that we're only storing a single secret at a time.
206 ///
207 /// Take a look at the [`SecretStore::put_secret`] method for a more
208 /// detailed explanation.
209 ///
210 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
211 #[cfg(feature = "e2e-encryption")]
212 pub(crate) store_secret_lock: Mutex<()>,
213
214 /// Lock ensuring that only one method at a time might modify our backup.
215 #[cfg(feature = "e2e-encryption")]
216 pub(crate) backup_modify_lock: Mutex<()>,
217
218 /// Lock ensuring that we're going to attempt to upload backups for a single
219 /// requester.
220 #[cfg(feature = "e2e-encryption")]
221 pub(crate) backup_upload_lock: Mutex<()>,
222
223 /// Handler making sure we only have one group session sharing request in
224 /// flight per room.
225 #[cfg(feature = "e2e-encryption")]
226 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
227
228 /// Lock making sure we're only doing one key claim request at a time.
229 #[cfg(feature = "e2e-encryption")]
230 pub(crate) key_claim_lock: Mutex<()>,
231
232 /// Handler to ensure that only one members request is running at a time,
233 /// given a room.
234 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
235
236 /// Handler to ensure that only one encryption state request is running at a
237 /// time, given a room.
238 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
239
240 /// Deduplicating handler for sending read receipts. The string is an
241 /// internal implementation detail, see [`Self::send_single_receipt`].
242 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
243
244 #[cfg(feature = "e2e-encryption")]
245 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
246
247 /// Latest "generation" of data known by the crypto store.
248 ///
249 /// This is a counter that only increments, set in the database (and can
250 /// wrap). It's incremented whenever some process acquires a lock for the
251 /// first time. *This assumes the crypto store lock is being held, to
252 /// avoid data races on writing to this value in the store*.
253 ///
254 /// The current process will maintain this value in local memory and in the
255 /// DB over time. Observing a different value than the one read in
256 /// memory, when reading from the store indicates that somebody else has
257 /// written into the database under our feet.
258 ///
259 /// TODO: this should live in the `OlmMachine`, since it's information
260 /// related to the lock. As of today (2023-07-28), we blow up the entire
261 /// olm machine when there's a generation mismatch. So storing the
262 /// generation in the olm machine would make the client think there's
263 /// *always* a mismatch, and that's why we need to store the generation
264 /// outside the `OlmMachine`.
265 #[cfg(feature = "e2e-encryption")]
266 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
267}
268
269pub(crate) struct ClientInner {
270 /// All the data related to authentication and authorization.
271 pub(crate) auth_ctx: Arc<AuthCtx>,
272
273 /// The URL of the server.
274 ///
275 /// Not to be confused with the `Self::homeserver`. `server` is usually
276 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
277 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
278 /// homeserver (at the time of writing — 2024-08-28).
279 ///
280 /// This value is optional depending on how the `Client` has been built.
281 /// If it's been built from a homeserver URL directly, we don't know the
282 /// server. However, if the `Client` has been built from a server URL or
283 /// name, then the homeserver has been discovered, and we know both.
284 server: Option<Url>,
285
286 /// The URL of the homeserver to connect to.
287 ///
288 /// This is the URL for the client-server Matrix API.
289 homeserver: StdRwLock<Url>,
290
291 /// The sliding sync version.
292 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
293
294 /// The underlying HTTP client.
295 pub(crate) http_client: HttpClient,
296
297 /// User session data.
298 pub(super) base_client: BaseClient,
299
300 /// Collection of in-memory caches for the [`Client`].
301 pub(crate) caches: ClientCaches,
302
303 /// Collection of locks individual client methods might want to use, either
304 /// to ensure that only a single call to a method happens at once or to
305 /// deduplicate multiple calls to a method.
306 pub(crate) locks: ClientLocks,
307
308 /// The cross-process lock configuration.
309 ///
310 /// The SDK provides cross-process store locks (see
311 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
312 /// [`CrossProcessLockConfig::MultiProcess`] is used.
313 ///
314 /// If multiple `Client`s are running in different processes, this
315 /// value MUST be different for each `Client`.
316 cross_process_lock_config: CrossProcessLockConfig,
317
318 /// A mapping of the times at which the current user sent typing notices,
319 /// keyed by room.
320 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
321
322 /// Event handlers. See `add_event_handler`.
323 pub(crate) event_handlers: EventHandlerStore,
324
325 /// Notification handlers. See `register_notification_handler`.
326 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
327
328 /// The sender-side of channels used to receive room updates.
329 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
330
331 /// The sender-side of a channel used to observe all the room updates of a
332 /// sync response.
333 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
334
335 /// Whether the client should update its homeserver URL with the discovery
336 /// information present in the login response.
337 respect_login_well_known: bool,
338
339 /// An event that can be listened on to wait for a successful sync. The
340 /// event will only be fired if a sync loop is running. Can be used for
341 /// synchronization, e.g. if we send out a request to create a room, we can
342 /// wait for the sync to get the data to fetch a room object from the state
343 /// store.
344 pub(crate) sync_beat: event_listener::Event,
345
346 /// A central cache for events, inactive first.
347 ///
348 /// It becomes active when [`EventCache::subscribe`] is called.
349 pub(crate) event_cache: OnceCell<EventCache>,
350
351 /// End-to-end encryption related state.
352 #[cfg(feature = "e2e-encryption")]
353 pub(crate) e2ee: EncryptionData,
354
355 /// The verification state of our own device.
356 #[cfg(feature = "e2e-encryption")]
357 pub(crate) verification_state: SharedObservable<VerificationState>,
358
359 /// Whether to enable the experimental support for sending and receiving
360 /// encrypted room history on invite, per [MSC4268].
361 ///
362 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
363 #[cfg(feature = "e2e-encryption")]
364 pub(crate) enable_share_history_on_invite: bool,
365
366 /// Data related to the [`SendQueue`].
367 ///
368 /// [`SendQueue`]: crate::send_queue::SendQueue
369 pub(crate) send_queue_data: Arc<SendQueueData>,
370
371 /// The `max_upload_size` value of the homeserver, it contains the max
372 /// request size you can send.
373 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
374
375 /// The entry point to get the [`LatestEvent`] of rooms and threads.
376 ///
377 /// [`LatestEvent`]: crate::latest_event::LatestEvent
378 latest_events: OnceCell<LatestEvents>,
379
380 /// Service handling the catching up of thread subscriptions in the
381 /// background.
382 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
383
384 #[cfg(feature = "experimental-search")]
385 /// Handler for [`RoomIndex`]'s of each room
386 search_index: SearchIndex,
387
388 /// A monitor for background tasks spawned by the client.
389 pub(crate) task_monitor: TaskMonitor,
390
391 /// A sender to notify subscribers about duplicate key upload errors
392 /// triggered by requests to /keys/upload.
393 #[cfg(feature = "e2e-encryption")]
394 pub(crate) duplicate_key_upload_error_sender:
395 broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
396}
397
398impl ClientInner {
399 /// Create a new `ClientInner`.
400 ///
401 /// All the fields passed as parameters here are those that must be cloned
402 /// upon instantiation of a sub-client, e.g. a client specialized for
403 /// notifications.
404 #[allow(clippy::too_many_arguments)]
405 async fn new(
406 auth_ctx: Arc<AuthCtx>,
407 server: Option<Url>,
408 homeserver: Url,
409 sliding_sync_version: SlidingSyncVersion,
410 http_client: HttpClient,
411 base_client: BaseClient,
412 supported_versions: CachedValue<SupportedVersions>,
413 well_known: CachedValue<Option<WellKnownResponse>>,
414 respect_login_well_known: bool,
415 event_cache: OnceCell<EventCache>,
416 send_queue: Arc<SendQueueData>,
417 latest_events: OnceCell<LatestEvents>,
418 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
419 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
420 cross_process_lock_config: CrossProcessLockConfig,
421 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
422 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
423 ) -> Arc<Self> {
424 let caches = ClientCaches {
425 supported_versions: supported_versions.into(),
426 well_known: well_known.into(),
427 server_metadata: Mutex::new(TtlCache::new()),
428 };
429
430 let client = Self {
431 server,
432 homeserver: StdRwLock::new(homeserver),
433 auth_ctx,
434 sliding_sync_version: StdRwLock::new(sliding_sync_version),
435 http_client,
436 base_client,
437 caches,
438 locks: Default::default(),
439 cross_process_lock_config,
440 typing_notice_times: Default::default(),
441 event_handlers: Default::default(),
442 notification_handlers: Default::default(),
443 room_update_channels: Default::default(),
444 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
445 // ballast for all observers to catch up.
446 room_updates_sender: broadcast::Sender::new(32),
447 respect_login_well_known,
448 sync_beat: event_listener::Event::new(),
449 event_cache,
450 send_queue_data: send_queue,
451 latest_events,
452 #[cfg(feature = "e2e-encryption")]
453 e2ee: EncryptionData::new(encryption_settings),
454 #[cfg(feature = "e2e-encryption")]
455 verification_state: SharedObservable::new(VerificationState::Unknown),
456 #[cfg(feature = "e2e-encryption")]
457 enable_share_history_on_invite,
458 server_max_upload_size: Mutex::new(OnceCell::new()),
459 #[cfg(feature = "experimental-search")]
460 search_index: search_index_handler,
461 thread_subscription_catchup,
462 task_monitor: TaskMonitor::new(),
463 #[cfg(feature = "e2e-encryption")]
464 duplicate_key_upload_error_sender: broadcast::channel(1).0,
465 };
466
467 #[allow(clippy::let_and_return)]
468 let client = Arc::new(client);
469
470 #[cfg(feature = "e2e-encryption")]
471 client.e2ee.initialize_tasks(&client);
472
473 let _ = client
474 .event_cache
475 .get_or_init(|| async {
476 EventCache::new(&client, client.base_client.event_cache_store().clone())
477 })
478 .await;
479
480 let _ = client
481 .thread_subscription_catchup
482 .get_or_init(|| async {
483 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
484 })
485 .await;
486
487 client
488 }
489}
490
491#[cfg(not(tarpaulin_include))]
492impl Debug for Client {
493 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
494 write!(fmt, "Client")
495 }
496}
497
498impl Client {
499 /// Create a new [`Client`] that will use the given homeserver.
500 ///
501 /// # Arguments
502 ///
503 /// * `homeserver_url` - The homeserver that the client should connect to.
504 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
505 Self::builder().homeserver_url(homeserver_url).build().await
506 }
507
508 /// Returns a subscriber that publishes an event every time the ignore user
509 /// list changes.
510 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
511 self.inner.base_client.subscribe_to_ignore_user_list_changes()
512 }
513
514 /// Create a new [`ClientBuilder`].
515 pub fn builder() -> ClientBuilder {
516 ClientBuilder::new()
517 }
518
519 pub(crate) fn base_client(&self) -> &BaseClient {
520 &self.inner.base_client
521 }
522
523 /// The underlying HTTP client.
524 pub fn http_client(&self) -> &reqwest::Client {
525 &self.inner.http_client.inner
526 }
527
528 pub(crate) fn locks(&self) -> &ClientLocks {
529 &self.inner.locks
530 }
531
532 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
533 &self.inner.auth_ctx
534 }
535
536 /// The cross-process store lock configuration used by this [`Client`].
537 ///
538 /// The SDK provides cross-process store locks (see
539 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
540 /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
541 /// the value used for all cross-process store locks used by this
542 /// `Client`.
543 pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
544 &self.inner.cross_process_lock_config
545 }
546
547 /// Change the homeserver URL used by this client.
548 ///
549 /// # Arguments
550 ///
551 /// * `homeserver_url` - The new URL to use.
552 fn set_homeserver(&self, homeserver_url: Url) {
553 *self.inner.homeserver.write().unwrap() = homeserver_url;
554 }
555
556 /// Change to a different homeserver and re-resolve well-known.
557 #[cfg(feature = "e2e-encryption")]
558 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
559 &self,
560 homeserver_url: Url,
561 ) -> Result<()> {
562 self.set_homeserver(homeserver_url);
563 self.reset_well_known().await?;
564 if let Some(well_known) = self.load_or_fetch_well_known().await? {
565 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
566 }
567 Ok(())
568 }
569
570 /// Get the capabilities of the homeserver.
571 ///
572 /// This method should be used to check what features are supported by the
573 /// homeserver.
574 ///
575 /// # Examples
576 ///
577 /// ```no_run
578 /// # use matrix_sdk::Client;
579 /// # use url::Url;
580 /// # async {
581 /// # let homeserver = Url::parse("http://example.com")?;
582 /// let client = Client::new(homeserver).await?;
583 ///
584 /// let capabilities = client.get_capabilities().await?;
585 ///
586 /// if capabilities.change_password.enabled {
587 /// // Change password
588 /// }
589 /// # anyhow::Ok(()) };
590 /// ```
591 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
592 let res = self.send(get_capabilities::v3::Request::new()).await?;
593 Ok(res.capabilities)
594 }
595
596 /// Get the server vendor information from the federation API.
597 ///
598 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
599 /// both the server's software name and version.
600 ///
601 /// # Examples
602 ///
603 /// ```no_run
604 /// # use matrix_sdk::Client;
605 /// # use url::Url;
606 /// # async {
607 /// # let homeserver = Url::parse("http://example.com")?;
608 /// let client = Client::new(homeserver).await?;
609 ///
610 /// let server_info = client.server_vendor_info(None).await?;
611 /// println!(
612 /// "Server: {}, Version: {}",
613 /// server_info.server_name, server_info.version
614 /// );
615 /// # anyhow::Ok(()) };
616 /// ```
617 #[cfg(feature = "federation-api")]
618 pub async fn server_vendor_info(
619 &self,
620 request_config: Option<RequestConfig>,
621 ) -> HttpResult<ServerVendorInfo> {
622 use ruma::api::federation::discovery::get_server_version;
623
624 let res = self
625 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
626 .await?;
627
628 // Extract server info, using defaults if fields are missing.
629 let server = res.server.unwrap_or_default();
630 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
631 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
632
633 Ok(ServerVendorInfo { server_name: server_name_str, version })
634 }
635
636 /// Get a copy of the default request config.
637 ///
638 /// The default request config is what's used when sending requests if no
639 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
640 /// function with such a parameter.
641 ///
642 /// If the default request config was not customized through
643 /// [`ClientBuilder`] when creating this `Client`, the returned value will
644 /// be equivalent to [`RequestConfig::default()`].
645 pub fn request_config(&self) -> RequestConfig {
646 self.inner.http_client.request_config
647 }
648
649 /// Check whether the client has been activated.
650 ///
651 /// A client is considered active when:
652 ///
653 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
654 /// is logged in,
655 /// 2. Has loaded cached data from storage,
656 /// 3. If encryption is enabled, it also initialized or restored its
657 /// `OlmMachine`.
658 pub fn is_active(&self) -> bool {
659 self.inner.base_client.is_active()
660 }
661
662 /// The server used by the client.
663 ///
664 /// See `Self::server` to learn more.
665 pub fn server(&self) -> Option<&Url> {
666 self.inner.server.as_ref()
667 }
668
669 /// The homeserver of the client.
670 pub fn homeserver(&self) -> Url {
671 self.inner.homeserver.read().unwrap().clone()
672 }
673
674 /// Get the sliding sync version.
675 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
676 self.inner.sliding_sync_version.read().unwrap().clone()
677 }
678
679 /// Override the sliding sync version.
680 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
681 let mut lock = self.inner.sliding_sync_version.write().unwrap();
682 *lock = version;
683 }
684
685 /// Get the Matrix user session meta information.
686 ///
687 /// If the client is currently logged in, this will return a
688 /// [`SessionMeta`] object which contains the user ID and device ID.
689 /// Otherwise it returns `None`.
690 pub fn session_meta(&self) -> Option<&SessionMeta> {
691 self.base_client().session_meta()
692 }
693
694 /// Returns a receiver that gets events for each room info update. To watch
695 /// for new events, use `receiver.resubscribe()`.
696 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
697 self.base_client().room_info_notable_update_receiver()
698 }
699
700 /// Performs a search for users.
701 /// The search is performed case-insensitively on user IDs and display names
702 ///
703 /// # Arguments
704 ///
705 /// * `search_term` - The search term for the search
706 /// * `limit` - The maximum number of results to return. Defaults to 10.
707 ///
708 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
709 pub async fn search_users(
710 &self,
711 search_term: &str,
712 limit: u64,
713 ) -> HttpResult<search_users::v3::Response> {
714 let mut request = search_users::v3::Request::new(search_term.to_owned());
715
716 if let Some(limit) = UInt::new(limit) {
717 request.limit = limit;
718 }
719
720 self.send(request).await
721 }
722
723 /// Get the user id of the current owner of the client.
724 pub fn user_id(&self) -> Option<&UserId> {
725 self.session_meta().map(|s| s.user_id.as_ref())
726 }
727
728 /// Get the device ID that identifies the current session.
729 pub fn device_id(&self) -> Option<&DeviceId> {
730 self.session_meta().map(|s| s.device_id.as_ref())
731 }
732
733 /// Get the current access token for this session.
734 ///
735 /// Will be `None` if the client has not been logged in.
736 pub fn access_token(&self) -> Option<String> {
737 self.auth_ctx().access_token()
738 }
739
740 /// Get the current tokens for this session.
741 ///
742 /// To be notified of changes in the session tokens, use
743 /// [`Client::subscribe_to_session_changes()`] or
744 /// [`Client::set_session_callbacks()`].
745 ///
746 /// Returns `None` if the client has not been logged in.
747 pub fn session_tokens(&self) -> Option<SessionTokens> {
748 self.auth_ctx().session_tokens()
749 }
750
751 /// Access the authentication API used to log in this client.
752 ///
753 /// Will be `None` if the client has not been logged in.
754 pub fn auth_api(&self) -> Option<AuthApi> {
755 match self.auth_ctx().auth_data.get()? {
756 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
757 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
758 }
759 }
760
761 /// Get the whole session info of this client.
762 ///
763 /// Will be `None` if the client has not been logged in.
764 ///
765 /// Can be used with [`Client::restore_session`] to restore a previously
766 /// logged-in session.
767 pub fn session(&self) -> Option<AuthSession> {
768 match self.auth_api()? {
769 AuthApi::Matrix(api) => api.session().map(Into::into),
770 AuthApi::OAuth(api) => api.full_session().map(Into::into),
771 }
772 }
773
774 /// Get a reference to the state store.
775 pub fn state_store(&self) -> &DynStateStore {
776 self.base_client().state_store()
777 }
778
779 /// Get a reference to the event cache store.
780 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
781 self.base_client().event_cache_store()
782 }
783
784 /// Get a reference to the media store.
785 pub fn media_store(&self) -> &MediaStoreLock {
786 self.base_client().media_store()
787 }
788
789 /// Access the native Matrix authentication API with this client.
790 pub fn matrix_auth(&self) -> MatrixAuth {
791 MatrixAuth::new(self.clone())
792 }
793
794 /// Get the account of the current owner of the client.
795 pub fn account(&self) -> Account {
796 Account::new(self.clone())
797 }
798
799 /// Get the encryption manager of the client.
800 #[cfg(feature = "e2e-encryption")]
801 pub fn encryption(&self) -> Encryption {
802 Encryption::new(self.clone())
803 }
804
805 /// Get the media manager of the client.
806 pub fn media(&self) -> Media {
807 Media::new(self.clone())
808 }
809
810 /// Get the pusher manager of the client.
811 pub fn pusher(&self) -> Pusher {
812 Pusher::new(self.clone())
813 }
814
815 /// Access the OAuth 2.0 API of the client.
816 pub fn oauth(&self) -> OAuth {
817 OAuth::new(self.clone())
818 }
819
820 /// Register a handler for a specific event type.
821 ///
822 /// The handler is a function or closure with one or more arguments. The
823 /// first argument is the event itself. All additional arguments are
824 /// "context" arguments: They have to implement [`EventHandlerContext`].
825 /// This trait is named that way because most of the types implementing it
826 /// give additional context about an event: The room it was in, its raw form
827 /// and other similar things. As two exceptions to this,
828 /// [`Client`] and [`EventHandlerHandle`] also implement the
829 /// `EventHandlerContext` trait so you don't have to clone your client
830 /// into the event handler manually and a handler can decide to remove
831 /// itself.
832 ///
833 /// Some context arguments are not universally applicable. A context
834 /// argument that isn't available for the given event type will result in
835 /// the event handler being skipped and an error being logged. The following
836 /// context argument types are only available for a subset of event types:
837 ///
838 /// * [`Room`] is only available for room-specific events, i.e. not for
839 /// events like global account data events or presence events.
840 ///
841 /// You can provide custom context via
842 /// [`add_event_handler_context`](Client::add_event_handler_context) and
843 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
844 /// into the event handler.
845 ///
846 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
847 ///
848 /// # Examples
849 ///
850 /// ```no_run
851 /// use matrix_sdk::{
852 /// deserialized_responses::EncryptionInfo,
853 /// event_handler::Ctx,
854 /// ruma::{
855 /// events::{
856 /// macros::EventContent,
857 /// push_rules::PushRulesEvent,
858 /// room::{
859 /// message::SyncRoomMessageEvent,
860 /// topic::SyncRoomTopicEvent,
861 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
862 /// },
863 /// },
864 /// push::Action,
865 /// Int, MilliSecondsSinceUnixEpoch,
866 /// },
867 /// Client, Room,
868 /// };
869 /// use serde::{Deserialize, Serialize};
870 ///
871 /// # async fn example(client: Client) {
872 /// client.add_event_handler(
873 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
874 /// // Common usage: Room event plus room and client.
875 /// },
876 /// );
877 /// client.add_event_handler(
878 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
879 /// async move {
880 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
881 /// // unencrypted events and events that were decrypted by the SDK.
882 /// }
883 /// },
884 /// );
885 /// client.add_event_handler(
886 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
887 /// async move {
888 /// // A `Vec<Action>` parameter allows you to know which push actions
889 /// // are applicable for an event. For example, an event with
890 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
891 /// // in the timeline.
892 /// }
893 /// },
894 /// );
895 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
896 /// // You can omit any or all arguments after the first.
897 /// });
898 ///
899 /// // Registering a temporary event handler:
900 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
901 /// /* Event handler */
902 /// });
903 /// client.remove_event_handler(handle);
904 ///
905 /// // Registering custom event handler context:
906 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
907 /// struct MyContext {
908 /// number: usize,
909 /// }
910 /// client.add_event_handler_context(MyContext { number: 5 });
911 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
912 /// // Use the context
913 /// });
914 ///
915 /// // This will handle membership events in joined rooms. Invites are special, see below.
916 /// client.add_event_handler(
917 /// |ev: SyncRoomMemberEvent| async move {},
918 /// );
919 ///
920 /// // To handle state events in invited rooms (including invite membership events),
921 /// // `StrippedRoomMemberEvent` should be used.
922 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
923 /// client.add_event_handler(
924 /// |ev: StrippedRoomMemberEvent| async move {},
925 /// );
926 ///
927 /// // Custom events work exactly the same way, you just need to declare
928 /// // the content struct and use the EventContent derive macro on it.
929 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
930 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
931 /// struct TokenEventContent {
932 /// token: String,
933 /// #[serde(rename = "exp")]
934 /// expires_at: MilliSecondsSinceUnixEpoch,
935 /// }
936 ///
937 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
938 /// todo!("Display the token");
939 /// });
940 ///
941 /// // Event handler closures can also capture local variables.
942 /// // Make sure they are cheap to clone though, because they will be cloned
943 /// // every time the closure is called.
944 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
945 ///
946 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
947 /// println!("Calling the handler with identifier {data}");
948 /// });
949 /// # }
950 /// ```
951 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
952 where
953 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
954 H: EventHandler<Ev, Ctx>,
955 {
956 self.add_event_handler_impl(handler, None)
957 }
958
959 /// Register a handler for a specific room, and event type.
960 ///
961 /// This method works the same way as
962 /// [`add_event_handler`][Self::add_event_handler], except that the handler
963 /// will only be called for events in the room with the specified ID. See
964 /// that method for more details on event handler functions.
965 ///
966 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
967 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
968 /// your use case.
969 pub fn add_room_event_handler<Ev, Ctx, H>(
970 &self,
971 room_id: &RoomId,
972 handler: H,
973 ) -> EventHandlerHandle
974 where
975 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
976 H: EventHandler<Ev, Ctx>,
977 {
978 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
979 }
980
981 /// Observe a specific event type.
982 ///
983 /// `Ev` represents the kind of event that will be observed. `Ctx`
984 /// represents the context that will come with the event. It relies on the
985 /// same mechanism as [`Client::add_event_handler`]. The main difference is
986 /// that it returns an [`ObservableEventHandler`] and doesn't require a
987 /// user-defined closure. It is possible to subscribe to the
988 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
989 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
990 /// Ctx)`.
991 ///
992 /// Be careful that only the most recent value can be observed. Subscribers
993 /// are notified when a new value is sent, but there is no guarantee
994 /// that they will see all values.
995 ///
996 /// # Example
997 ///
998 /// Let's see a classical usage:
999 ///
1000 /// ```
1001 /// use futures_util::StreamExt as _;
1002 /// use matrix_sdk::{
1003 /// Client, Room,
1004 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
1005 /// };
1006 ///
1007 /// # async fn example(client: Client) -> Option<()> {
1008 /// let observer =
1009 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1010 ///
1011 /// let mut subscriber = observer.subscribe();
1012 ///
1013 /// let (event, (room, push_actions)) = subscriber.next().await?;
1014 /// # Some(())
1015 /// # }
1016 /// ```
1017 ///
1018 /// Now let's see how to get several contexts that can be useful for you:
1019 ///
1020 /// ```
1021 /// use matrix_sdk::{
1022 /// Client, Room,
1023 /// deserialized_responses::EncryptionInfo,
1024 /// ruma::{
1025 /// events::room::{
1026 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1027 /// },
1028 /// push::Action,
1029 /// },
1030 /// };
1031 ///
1032 /// # async fn example(client: Client) {
1033 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1034 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1035 ///
1036 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1037 /// // to distinguish between unencrypted events and events that were decrypted
1038 /// // by the SDK.
1039 /// let _ = client
1040 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1041 /// );
1042 ///
1043 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1044 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1045 /// // should be highlighted in the timeline.
1046 /// let _ =
1047 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1048 ///
1049 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1050 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1051 /// # }
1052 /// ```
1053 ///
1054 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1055 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1056 where
1057 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1058 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1059 {
1060 self.observe_room_events_impl(None)
1061 }
1062
1063 /// Observe a specific room, and event type.
1064 ///
1065 /// This method works the same way as [`Client::observe_events`], except
1066 /// that the observability will only be applied for events in the room with
1067 /// the specified ID. See that method for more details.
1068 ///
1069 /// Be careful that only the most recent value can be observed. Subscribers
1070 /// are notified when a new value is sent, but there is no guarantee
1071 /// that they will see all values.
1072 pub fn observe_room_events<Ev, Ctx>(
1073 &self,
1074 room_id: &RoomId,
1075 ) -> ObservableEventHandler<(Ev, Ctx)>
1076 where
1077 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1078 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1079 {
1080 self.observe_room_events_impl(Some(room_id.to_owned()))
1081 }
1082
1083 /// Shared implementation for `Client::observe_events` and
1084 /// `Client::observe_room_events`.
1085 fn observe_room_events_impl<Ev, Ctx>(
1086 &self,
1087 room_id: Option<OwnedRoomId>,
1088 ) -> ObservableEventHandler<(Ev, Ctx)>
1089 where
1090 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1091 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1092 {
1093 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1094 // new value.
1095 let shared_observable = SharedObservable::new(None);
1096
1097 ObservableEventHandler::new(
1098 shared_observable.clone(),
1099 self.event_handler_drop_guard(self.add_event_handler_impl(
1100 move |event: Ev, context: Ctx| {
1101 shared_observable.set(Some((event, context)));
1102
1103 ready(())
1104 },
1105 room_id,
1106 )),
1107 )
1108 }
1109
1110 /// Remove the event handler associated with the handle.
1111 ///
1112 /// Note that you **must not** call `remove_event_handler` from the
1113 /// non-async part of an event handler, that is:
1114 ///
1115 /// ```ignore
1116 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1117 /// // ⚠ this will cause a deadlock ⚠
1118 /// client.remove_event_handler(handle);
1119 ///
1120 /// async move {
1121 /// // removing the event handler here is fine
1122 /// client.remove_event_handler(handle);
1123 /// }
1124 /// })
1125 /// ```
1126 ///
1127 /// Note also that handlers that remove themselves will still execute with
1128 /// events received in the same sync cycle.
1129 ///
1130 /// # Arguments
1131 ///
1132 /// `handle` - The [`EventHandlerHandle`] that is returned when
1133 /// registering the event handler with [`Client::add_event_handler`].
1134 ///
1135 /// # Examples
1136 ///
1137 /// ```no_run
1138 /// # use url::Url;
1139 /// # use tokio::sync::mpsc;
1140 /// #
1141 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1142 /// #
1143 /// use matrix_sdk::{
1144 /// Client, event_handler::EventHandlerHandle,
1145 /// ruma::events::room::member::SyncRoomMemberEvent,
1146 /// };
1147 /// #
1148 /// # futures_executor::block_on(async {
1149 /// # let client = matrix_sdk::Client::builder()
1150 /// # .homeserver_url(homeserver)
1151 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1152 /// # .build()
1153 /// # .await
1154 /// # .unwrap();
1155 ///
1156 /// client.add_event_handler(
1157 /// |ev: SyncRoomMemberEvent,
1158 /// client: Client,
1159 /// handle: EventHandlerHandle| async move {
1160 /// // Common usage: Check arriving Event is the expected one
1161 /// println!("Expected RoomMemberEvent received!");
1162 /// client.remove_event_handler(handle);
1163 /// },
1164 /// );
1165 /// # });
1166 /// ```
1167 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1168 self.inner.event_handlers.remove(handle);
1169 }
1170
1171 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1172 /// the given handle.
1173 ///
1174 /// When the returned value is dropped, the event handler will be removed.
1175 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1176 EventHandlerDropGuard::new(handle, self.clone())
1177 }
1178
1179 /// Add an arbitrary value for use as event handler context.
1180 ///
1181 /// The value can be obtained in an event handler by adding an argument of
1182 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1183 ///
1184 /// If a value of the same type has been added before, it will be
1185 /// overwritten.
1186 ///
1187 /// # Examples
1188 ///
1189 /// ```no_run
1190 /// use matrix_sdk::{
1191 /// Room, event_handler::Ctx,
1192 /// ruma::events::room::message::SyncRoomMessageEvent,
1193 /// };
1194 /// # #[derive(Clone)]
1195 /// # struct SomeType;
1196 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1197 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1198 /// # futures_executor::block_on(async {
1199 /// # let client = matrix_sdk::Client::builder()
1200 /// # .homeserver_url(homeserver)
1201 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1202 /// # .build()
1203 /// # .await
1204 /// # .unwrap();
1205 ///
1206 /// // Handle used to send messages to the UI part of the app
1207 /// let my_gui_handle: SomeType = obtain_gui_handle();
1208 ///
1209 /// client.add_event_handler_context(my_gui_handle.clone());
1210 /// client.add_event_handler(
1211 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1212 /// async move {
1213 /// // gui_handle.send(DisplayMessage { message: ev });
1214 /// }
1215 /// },
1216 /// );
1217 /// # });
1218 /// ```
1219 pub fn add_event_handler_context<T>(&self, ctx: T)
1220 where
1221 T: Clone + Send + Sync + 'static,
1222 {
1223 self.inner.event_handlers.add_context(ctx);
1224 }
1225
1226 /// Register a handler for a notification.
1227 ///
1228 /// Similar to [`Client::add_event_handler`], but only allows functions
1229 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1230 /// [`Client`] for now.
1231 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1232 where
1233 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1234 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1235 {
1236 self.inner.notification_handlers.write().await.push(Box::new(
1237 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1238 ));
1239
1240 self
1241 }
1242
1243 /// Subscribe to all updates for the room with the given ID.
1244 ///
1245 /// The returned receiver will receive a new message for each sync response
1246 /// that contains updates for that room.
1247 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1248 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1249 btree_map::Entry::Vacant(entry) => {
1250 let (tx, rx) = broadcast::channel(8);
1251 entry.insert(tx);
1252 rx
1253 }
1254 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1255 }
1256 }
1257
1258 /// Subscribe to all updates to all rooms, whenever any has been received in
1259 /// a sync response.
1260 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1261 self.inner.room_updates_sender.subscribe()
1262 }
1263
1264 pub(crate) async fn notification_handlers(
1265 &self,
1266 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1267 self.inner.notification_handlers.read().await
1268 }
1269
1270 /// Get all the rooms the client knows about.
1271 ///
1272 /// This will return the list of joined, invited, and left rooms.
1273 pub fn rooms(&self) -> Vec<Room> {
1274 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1275 }
1276
1277 /// Get all the rooms the client knows about, filtered by room state.
1278 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1279 self.base_client()
1280 .rooms_filtered(filter)
1281 .into_iter()
1282 .map(|room| Room::new(self.clone(), room))
1283 .collect()
1284 }
1285
1286 /// Get a stream of all the rooms, in addition to the existing rooms.
1287 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1288 let (rooms, stream) = self.base_client().rooms_stream();
1289
1290 let map_room = |room| Room::new(self.clone(), room);
1291
1292 (
1293 rooms.into_iter().map(map_room).collect(),
1294 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1295 )
1296 }
1297
1298 /// Returns the joined rooms this client knows about.
1299 pub fn joined_rooms(&self) -> Vec<Room> {
1300 self.rooms_filtered(RoomStateFilter::JOINED)
1301 }
1302
1303 /// Returns the invited rooms this client knows about.
1304 pub fn invited_rooms(&self) -> Vec<Room> {
1305 self.rooms_filtered(RoomStateFilter::INVITED)
1306 }
1307
1308 /// Returns the left rooms this client knows about.
1309 pub fn left_rooms(&self) -> Vec<Room> {
1310 self.rooms_filtered(RoomStateFilter::LEFT)
1311 }
1312
1313 /// Returns the joined space rooms this client knows about.
1314 pub fn joined_space_rooms(&self) -> Vec<Room> {
1315 self.base_client()
1316 .rooms_filtered(RoomStateFilter::JOINED)
1317 .into_iter()
1318 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1319 .collect()
1320 }
1321
1322 /// Get a room with the given room id.
1323 ///
1324 /// # Arguments
1325 ///
1326 /// `room_id` - The unique id of the room that should be fetched.
1327 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1328 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1329 }
1330
1331 /// Gets the preview of a room, whether the current user has joined it or
1332 /// not.
1333 pub async fn get_room_preview(
1334 &self,
1335 room_or_alias_id: &RoomOrAliasId,
1336 via: Vec<OwnedServerName>,
1337 ) -> Result<RoomPreview> {
1338 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1339 Ok(room_id) => room_id.to_owned(),
1340 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1341 };
1342
1343 if let Some(room) = self.get_room(&room_id) {
1344 // The cached data can only be trusted if the room state is joined or
1345 // banned: for invite and knock rooms, no updates will be received
1346 // for the rooms after the invite/knock action took place so we may
1347 // have very out to date data for important fields such as
1348 // `join_rule`. For left rooms, the homeserver should return the latest info.
1349 match room.state() {
1350 RoomState::Joined | RoomState::Banned => {
1351 return Ok(RoomPreview::from_known_room(&room).await);
1352 }
1353 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1354 }
1355 }
1356
1357 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1358 }
1359
1360 /// Resolve a room alias to a room id and a list of servers which know
1361 /// about it.
1362 ///
1363 /// # Arguments
1364 ///
1365 /// `room_alias` - The room alias to be resolved.
1366 pub async fn resolve_room_alias(
1367 &self,
1368 room_alias: &RoomAliasId,
1369 ) -> HttpResult<get_alias::v3::Response> {
1370 let request = get_alias::v3::Request::new(room_alias.to_owned());
1371 self.send(request).await
1372 }
1373
1374 /// Checks if a room alias is not in use yet.
1375 ///
1376 /// Returns:
1377 /// - `Ok(true)` if the room alias is available.
1378 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1379 /// status code).
1380 /// - An `Err` otherwise.
1381 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1382 match self.resolve_room_alias(alias).await {
1383 // The room alias was resolved, so it's already in use.
1384 Ok(_) => Ok(false),
1385 Err(error) => {
1386 match error.client_api_error_kind() {
1387 // The room alias wasn't found, so it's available.
1388 Some(ErrorKind::NotFound) => Ok(true),
1389 _ => Err(error),
1390 }
1391 }
1392 }
1393 }
1394
1395 /// Adds a new room alias associated with a room to the room directory.
1396 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1397 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1398 self.send(request).await?;
1399 Ok(())
1400 }
1401
1402 /// Removes a room alias from the room directory.
1403 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1404 let request = delete_alias::v3::Request::new(alias.to_owned());
1405 self.send(request).await?;
1406 Ok(())
1407 }
1408
1409 /// Update the homeserver from the login response well-known if needed.
1410 ///
1411 /// # Arguments
1412 ///
1413 /// * `login_well_known` - The `well_known` field from a successful login
1414 /// response.
1415 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1416 if self.inner.respect_login_well_known
1417 && let Some(well_known) = login_well_known
1418 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1419 {
1420 self.set_homeserver(homeserver);
1421 }
1422 }
1423
1424 /// Similar to [`Client::restore_session_with`], with
1425 /// [`RoomLoadSettings::default()`].
1426 ///
1427 /// # Panics
1428 ///
1429 /// Panics if a session was already restored or logged in.
1430 #[instrument(skip_all)]
1431 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1432 self.restore_session_with(session, RoomLoadSettings::default()).await
1433 }
1434
1435 /// Restore a session previously logged-in using one of the available
1436 /// authentication APIs. The number of rooms to restore is controlled by
1437 /// [`RoomLoadSettings`].
1438 ///
1439 /// See the documentation of the corresponding authentication API's
1440 /// `restore_session` method for more information.
1441 ///
1442 /// # Panics
1443 ///
1444 /// Panics if a session was already restored or logged in.
1445 #[instrument(skip_all)]
1446 pub async fn restore_session_with(
1447 &self,
1448 session: impl Into<AuthSession>,
1449 room_load_settings: RoomLoadSettings,
1450 ) -> Result<()> {
1451 let session = session.into();
1452 match session {
1453 AuthSession::Matrix(session) => {
1454 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1455 }
1456 AuthSession::OAuth(session) => {
1457 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1458 }
1459 }
1460 }
1461
1462 /// Refresh the access token using the authentication API used to log into
1463 /// this session.
1464 ///
1465 /// See the documentation of the authentication API's `refresh_access_token`
1466 /// method for more information.
1467 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1468 let Some(auth_api) = self.auth_api() else {
1469 return Err(RefreshTokenError::RefreshTokenRequired);
1470 };
1471
1472 match auth_api {
1473 AuthApi::Matrix(api) => {
1474 trace!("Token refresh: Using the homeserver.");
1475 Box::pin(api.refresh_access_token()).await?;
1476 }
1477 AuthApi::OAuth(api) => {
1478 trace!("Token refresh: Using OAuth 2.0.");
1479 Box::pin(api.refresh_access_token()).await?;
1480 }
1481 }
1482
1483 Ok(())
1484 }
1485
1486 /// Log out the current session using the proper authentication API.
1487 ///
1488 /// # Errors
1489 ///
1490 /// Returns an error if the session is not authenticated or if an error
1491 /// occurred while making the request to the server.
1492 pub async fn logout(&self) -> Result<(), Error> {
1493 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1494 match auth_api {
1495 AuthApi::Matrix(matrix_auth) => {
1496 matrix_auth.logout().await?;
1497 Ok(())
1498 }
1499 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1500 }
1501 }
1502
1503 /// Get or upload a sync filter.
1504 ///
1505 /// This method will either get a filter ID from the store or upload the
1506 /// filter definition to the homeserver and return the new filter ID.
1507 ///
1508 /// # Arguments
1509 ///
1510 /// * `filter_name` - The unique name of the filter, this name will be used
1511 /// locally to store and identify the filter ID returned by the server.
1512 ///
1513 /// * `definition` - The filter definition that should be uploaded to the
1514 /// server if no filter ID can be found in the store.
1515 ///
1516 /// # Examples
1517 ///
1518 /// ```no_run
1519 /// # use matrix_sdk::{
1520 /// # Client, config::SyncSettings,
1521 /// # ruma::api::client::{
1522 /// # filter::{
1523 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1524 /// # },
1525 /// # sync::sync_events::v3::Filter,
1526 /// # }
1527 /// # };
1528 /// # use url::Url;
1529 /// # async {
1530 /// # let homeserver = Url::parse("http://example.com").unwrap();
1531 /// # let client = Client::new(homeserver).await.unwrap();
1532 /// let mut filter = FilterDefinition::default();
1533 ///
1534 /// // Let's enable member lazy loading.
1535 /// filter.room.state.lazy_load_options =
1536 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1537 ///
1538 /// let filter_id = client
1539 /// .get_or_upload_filter("sync", filter)
1540 /// .await
1541 /// .unwrap();
1542 ///
1543 /// let sync_settings = SyncSettings::new()
1544 /// .filter(Filter::FilterId(filter_id));
1545 ///
1546 /// let response = client.sync_once(sync_settings).await.unwrap();
1547 /// # };
1548 #[instrument(skip(self, definition))]
1549 pub async fn get_or_upload_filter(
1550 &self,
1551 filter_name: &str,
1552 definition: FilterDefinition,
1553 ) -> Result<String> {
1554 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1555 debug!("Found filter locally");
1556 Ok(filter)
1557 } else {
1558 debug!("Didn't find filter locally");
1559 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1560 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1561 let response = self.send(request).await?;
1562
1563 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1564
1565 Ok(response.filter_id)
1566 }
1567 }
1568
1569 /// Prepare to join a room by ID, by getting the current details about it
1570 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1571 let room = self.get_room(room_id)?;
1572
1573 let inviter = match room.invite_details().await {
1574 Ok(details) => details.inviter,
1575 Err(Error::WrongRoomState(_)) => None,
1576 Err(e) => {
1577 warn!("Error fetching invite details for room: {e:?}");
1578 None
1579 }
1580 };
1581
1582 Some(PreJoinRoomInfo { inviter })
1583 }
1584
1585 /// Finish joining a room.
1586 ///
1587 /// If the room was an invite that should be marked as a DM, will include it
1588 /// in the DM event after creating the joined room.
1589 ///
1590 /// If encrypted history sharing is enabled, will check to see if we have a
1591 /// key bundle, and import it if so.
1592 ///
1593 /// # Arguments
1594 ///
1595 /// * `room_id` - The `RoomId` of the room that was joined.
1596 /// * `pre_join_room_info` - Information about the room before we joined.
1597 async fn finish_join_room(
1598 &self,
1599 room_id: &RoomId,
1600 pre_join_room_info: Option<PreJoinRoomInfo>,
1601 ) -> Result<Room> {
1602 info!(?room_id, ?pre_join_room_info, "Completing room join");
1603 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1604 room.state() == RoomState::Invited
1605 && room.is_direct().await.unwrap_or_else(|e| {
1606 warn!(%room_id, "is_direct() failed: {e}");
1607 false
1608 })
1609 } else {
1610 false
1611 };
1612
1613 let base_room = self
1614 .base_client()
1615 .room_joined(
1616 room_id,
1617 pre_join_room_info
1618 .as_ref()
1619 .and_then(|info| info.inviter.as_ref())
1620 .map(|i| i.user_id().to_owned()),
1621 )
1622 .await?;
1623 let room = Room::new(self.clone(), base_room);
1624
1625 if mark_as_dm {
1626 room.set_is_direct(true).await?;
1627 }
1628
1629 // If we joined following an invite, check if we had previously received a key
1630 // bundle from the inviter, and import it if so.
1631 //
1632 // It's important that we only do this once `BaseClient::room_joined` has
1633 // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1634 // race.
1635 #[cfg(feature = "e2e-encryption")]
1636 if self.inner.enable_share_history_on_invite
1637 && let Some(inviter) =
1638 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1639 {
1640 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1641 .await?;
1642 }
1643
1644 // Suppress "unused variable" and "unused field" lints
1645 #[cfg(not(feature = "e2e-encryption"))]
1646 let _ = pre_join_room_info.map(|i| i.inviter);
1647
1648 Ok(room)
1649 }
1650
1651 /// Join a room by `RoomId`.
1652 ///
1653 /// Returns the `Room` in the joined state.
1654 ///
1655 /// # Arguments
1656 ///
1657 /// * `room_id` - The `RoomId` of the room to be joined.
1658 #[instrument(skip(self))]
1659 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1660 // See who invited us to this room, if anyone. Note we have to do this before
1661 // making the `/join` request, otherwise we could race against the sync.
1662 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1663
1664 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1665 let response = self.send(request).await?;
1666 self.finish_join_room(&response.room_id, pre_join_info).await
1667 }
1668
1669 /// Join a room by `RoomOrAliasId`.
1670 ///
1671 /// Returns the `Room` in the joined state.
1672 ///
1673 /// # Arguments
1674 ///
1675 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1676 /// alias looks like `#name:example.com`.
1677 /// * `server_names` - The server names to be used for resolving the alias,
1678 /// if needs be.
1679 #[instrument(skip(self))]
1680 pub async fn join_room_by_id_or_alias(
1681 &self,
1682 alias: &RoomOrAliasId,
1683 server_names: &[OwnedServerName],
1684 ) -> Result<Room> {
1685 let pre_join_info = {
1686 match alias.try_into() {
1687 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1688 Err(_) => {
1689 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1690 // responding to an invitation to the room, and therefore don't need to handle
1691 // things that happen as a result of invites.
1692 None
1693 }
1694 }
1695 };
1696 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1697 via: server_names.to_owned(),
1698 });
1699 let response = self.send(request).await?;
1700 self.finish_join_room(&response.room_id, pre_join_info).await
1701 }
1702
1703 /// Search the homeserver's directory of public rooms.
1704 ///
1705 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1706 /// a `get_public_rooms::Response`.
1707 ///
1708 /// # Arguments
1709 ///
1710 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1711 ///
1712 /// * `since` - Pagination token from a previous request.
1713 ///
1714 /// * `server` - The name of the server, if `None` the requested server is
1715 /// used.
1716 ///
1717 /// # Examples
1718 /// ```no_run
1719 /// use matrix_sdk::Client;
1720 /// # use url::Url;
1721 /// # let homeserver = Url::parse("http://example.com").unwrap();
1722 /// # let limit = Some(10);
1723 /// # let since = Some("since token");
1724 /// # let server = Some("servername.com".try_into().unwrap());
1725 /// # async {
1726 /// let mut client = Client::new(homeserver).await.unwrap();
1727 ///
1728 /// client.public_rooms(limit, since, server).await;
1729 /// # };
1730 /// ```
1731 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1732 pub async fn public_rooms(
1733 &self,
1734 limit: Option<u32>,
1735 since: Option<&str>,
1736 server: Option<&ServerName>,
1737 ) -> HttpResult<get_public_rooms::v3::Response> {
1738 let limit = limit.map(UInt::from);
1739
1740 let request = assign!(get_public_rooms::v3::Request::new(), {
1741 limit,
1742 since: since.map(ToOwned::to_owned),
1743 server: server.map(ToOwned::to_owned),
1744 });
1745 self.send(request).await
1746 }
1747
1748 /// Create a room with the given parameters.
1749 ///
1750 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1751 /// created room.
1752 ///
1753 /// If you want to create a direct message with one specific user, you can
1754 /// use [`create_dm`][Self::create_dm], which is more convenient than
1755 /// assembling the [`create_room::v3::Request`] yourself.
1756 ///
1757 /// If the `is_direct` field of the request is set to `true` and at least
1758 /// one user is invited, the room will be automatically added to the direct
1759 /// rooms in the account data.
1760 ///
1761 /// # Examples
1762 ///
1763 /// ```no_run
1764 /// use matrix_sdk::{
1765 /// Client,
1766 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1767 /// };
1768 /// # use url::Url;
1769 /// #
1770 /// # async {
1771 /// # let homeserver = Url::parse("http://example.com").unwrap();
1772 /// let request = CreateRoomRequest::new();
1773 /// let client = Client::new(homeserver).await.unwrap();
1774 /// assert!(client.create_room(request).await.is_ok());
1775 /// # };
1776 /// ```
1777 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1778 let invite = request.invite.clone();
1779 let is_direct_room = request.is_direct;
1780 let response = self.send(request).await?;
1781
1782 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1783
1784 let joined_room = Room::new(self.clone(), base_room);
1785
1786 if is_direct_room
1787 && !invite.is_empty()
1788 && let Err(error) =
1789 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1790 {
1791 // FIXME: Retry in the background
1792 error!("Failed to mark room as DM: {error}");
1793 }
1794
1795 Ok(joined_room)
1796 }
1797
1798 /// Create a DM room.
1799 ///
1800 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1801 /// given user being invited, the room marked `is_direct` and both the
1802 /// creator and invitee getting the default maximum power level.
1803 ///
1804 /// If the `e2e-encryption` feature is enabled, the room will also be
1805 /// encrypted.
1806 ///
1807 /// # Arguments
1808 ///
1809 /// * `user_id` - The ID of the user to create a DM for.
1810 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1811 #[cfg(feature = "e2e-encryption")]
1812 let initial_state = vec![
1813 InitialStateEvent::with_empty_state_key(
1814 RoomEncryptionEventContent::with_recommended_defaults(),
1815 )
1816 .to_raw_any(),
1817 ];
1818
1819 #[cfg(not(feature = "e2e-encryption"))]
1820 let initial_state = vec![];
1821
1822 let request = assign!(create_room::v3::Request::new(), {
1823 invite: vec![user_id.to_owned()],
1824 is_direct: true,
1825 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1826 initial_state,
1827 });
1828
1829 self.create_room(request).await
1830 }
1831
1832 /// Get the existing DM room with the given user, if any.
1833 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1834 let rooms = self.joined_rooms();
1835
1836 // Find the room we share with the `user_id` and only with `user_id`
1837 let room = rooms.into_iter().find(|r| {
1838 let targets = r.direct_targets();
1839 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1840 });
1841
1842 trace!(?user_id, ?room, "Found DM room with user");
1843 room
1844 }
1845
1846 /// Search the homeserver's directory for public rooms with a filter.
1847 ///
1848 /// # Arguments
1849 ///
1850 /// * `room_search` - The easiest way to create this request is using the
1851 /// `get_public_rooms_filtered::Request` itself.
1852 ///
1853 /// # Examples
1854 ///
1855 /// ```no_run
1856 /// # use url::Url;
1857 /// # use matrix_sdk::Client;
1858 /// # async {
1859 /// # let homeserver = Url::parse("http://example.com")?;
1860 /// use matrix_sdk::ruma::{
1861 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1862 /// };
1863 /// # let mut client = Client::new(homeserver).await?;
1864 ///
1865 /// let mut filter = Filter::new();
1866 /// filter.generic_search_term = Some("rust".to_owned());
1867 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1868 /// request.filter = filter;
1869 ///
1870 /// let response = client.public_rooms_filtered(request).await?;
1871 ///
1872 /// for room in response.chunk {
1873 /// println!("Found room {room:?}");
1874 /// }
1875 /// # anyhow::Ok(()) };
1876 /// ```
1877 pub async fn public_rooms_filtered(
1878 &self,
1879 request: get_public_rooms_filtered::v3::Request,
1880 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1881 self.send(request).await
1882 }
1883
1884 /// Send an arbitrary request to the server, without updating client state.
1885 ///
1886 /// **Warning:** Because this method *does not* update the client state, it
1887 /// is important to make sure that you account for this yourself, and
1888 /// use wrapper methods where available. This method should *only* be
1889 /// used if a wrapper method for the endpoint you'd like to use is not
1890 /// available.
1891 ///
1892 /// # Arguments
1893 ///
1894 /// * `request` - A filled out and valid request for the endpoint to be hit
1895 ///
1896 /// * `timeout` - An optional request timeout setting, this overrides the
1897 /// default request setting if one was set.
1898 ///
1899 /// # Examples
1900 ///
1901 /// ```no_run
1902 /// # use matrix_sdk::{Client, config::SyncSettings};
1903 /// # use url::Url;
1904 /// # async {
1905 /// # let homeserver = Url::parse("http://localhost:8080")?;
1906 /// # let mut client = Client::new(homeserver).await?;
1907 /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1908 ///
1909 /// // First construct the request you want to make
1910 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1911 /// // for all available Endpoints
1912 /// let user_id = owned_user_id!("@example:localhost");
1913 /// let request = profile::get_profile::v3::Request::new(user_id);
1914 ///
1915 /// // Start the request using Client::send()
1916 /// let response = client.send(request).await?;
1917 ///
1918 /// // Check the corresponding Response struct to find out what types are
1919 /// // returned
1920 /// # anyhow::Ok(()) };
1921 /// ```
1922 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1923 where
1924 Request: OutgoingRequest + Clone + Debug,
1925 Request::Authentication: SupportedAuthScheme,
1926 Request::PathBuilder: SupportedPathBuilder,
1927 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1928 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1929 {
1930 SendRequest {
1931 client: self.clone(),
1932 request,
1933 config: None,
1934 send_progress: Default::default(),
1935 }
1936 }
1937
1938 pub(crate) async fn send_inner<Request>(
1939 &self,
1940 request: Request,
1941 config: Option<RequestConfig>,
1942 send_progress: SharedObservable<TransmissionProgress>,
1943 ) -> HttpResult<Request::IncomingResponse>
1944 where
1945 Request: OutgoingRequest + Debug,
1946 Request::Authentication: SupportedAuthScheme,
1947 Request::PathBuilder: SupportedPathBuilder,
1948 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1949 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1950 {
1951 let homeserver = self.homeserver().to_string();
1952 let access_token = self.access_token();
1953 let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1954
1955 let path_builder_input =
1956 Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1957
1958 let result = self
1959 .inner
1960 .http_client
1961 .send(
1962 request,
1963 config,
1964 homeserver,
1965 access_token.as_deref(),
1966 path_builder_input,
1967 send_progress,
1968 )
1969 .await;
1970
1971 if let Err(Some(ErrorKind::UnknownToken { .. })) =
1972 result.as_ref().map_err(HttpError::client_api_error_kind)
1973 && let Some(access_token) = &access_token
1974 {
1975 // Mark the access token as expired.
1976 self.auth_ctx().set_access_token_expired(access_token);
1977 }
1978
1979 result
1980 }
1981
1982 fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
1983 _ = self
1984 .inner
1985 .auth_ctx
1986 .session_change_sender
1987 .send(SessionChange::UnknownToken(unknown_token_data.clone()));
1988 }
1989
1990 /// Fetches server versions from network; no caching.
1991 pub async fn fetch_server_versions(
1992 &self,
1993 request_config: Option<RequestConfig>,
1994 ) -> HttpResult<get_supported_versions::Response> {
1995 // Since this was called by the user, try to refresh the access token if
1996 // necessary.
1997 self.fetch_server_versions_inner(false, request_config).await
1998 }
1999
2000 /// Fetches server versions from network; no caching.
2001 ///
2002 /// If the access token is expired and `failsafe` is `false`, this will
2003 /// attempt to refresh the access token, otherwise this will try to make an
2004 /// unauthenticated request instead.
2005 pub(crate) async fn fetch_server_versions_inner(
2006 &self,
2007 failsafe: bool,
2008 request_config: Option<RequestConfig>,
2009 ) -> HttpResult<get_supported_versions::Response> {
2010 if !failsafe {
2011 // `Client::send()` handles refreshing access tokens.
2012 return self
2013 .send(get_supported_versions::Request::new())
2014 .with_request_config(request_config)
2015 .await;
2016 }
2017
2018 let homeserver = self.homeserver().to_string();
2019
2020 // If we have a fresh access token, try with it first.
2021 if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2022 && self.auth_ctx().has_valid_access_token()
2023 && let Some(access_token) = self.access_token()
2024 {
2025 let result = self
2026 .inner
2027 .http_client
2028 .send(
2029 get_supported_versions::Request::new(),
2030 request_config,
2031 homeserver.clone(),
2032 Some(&access_token),
2033 (),
2034 Default::default(),
2035 )
2036 .await;
2037
2038 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2039 result.as_ref().map_err(HttpError::client_api_error_kind)
2040 {
2041 // If the access token is actually expired, mark it as expired and fallback to
2042 // the unauthenticated request below.
2043 self.auth_ctx().set_access_token_expired(&access_token);
2044 } else {
2045 // If the request succeeded or it's an other error, just stop now.
2046 return result;
2047 }
2048 }
2049
2050 // Try without authentication.
2051 self.inner
2052 .http_client
2053 .send(
2054 get_supported_versions::Request::new(),
2055 request_config,
2056 homeserver.clone(),
2057 None,
2058 (),
2059 Default::default(),
2060 )
2061 .await
2062 }
2063
2064 /// Fetches client well_known from network; no caching.
2065 ///
2066 /// 1. If the [`Client::server`] value is available, we use it to fetch the
2067 /// well-known contents.
2068 /// 2. If it's not, we try extracting the server name from the
2069 /// [`Client::user_id`] and building the server URL from it.
2070 /// 3. If we couldn't get the well-known contents with either the explicit
2071 /// server name or the implicit extracted one, we try the homeserver URL
2072 /// as a last resort.
2073 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2074 let homeserver = self.homeserver();
2075 let scheme = homeserver.scheme();
2076
2077 // Use the server name, either an explicit one or an implicit one taken from
2078 // the user id: sometimes we'll have only the homeserver url available and no
2079 // server name, but the server name can be extracted from the current user id.
2080 let server_url = self
2081 .server()
2082 .map(|server| server.to_string())
2083 // If the server name wasn't available, extract it from the user id and build a URL:
2084 // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2085 // will be the same for the public server url, lacking a better candidate.
2086 .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2087
2088 // If the server name is available, first try using it
2089 let response = if let Some(server_url) = server_url {
2090 // First try using the server name
2091 self.fetch_client_well_known_with_url(server_url).await
2092 } else {
2093 None
2094 };
2095
2096 // If we didn't get a well-known value yet, try with the homeserver url instead:
2097 if response.is_none() {
2098 // Sometimes people configure their well-known directly on the homeserver so use
2099 // this as a fallback when the server name is unknown.
2100 warn!(
2101 "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2102 );
2103 self.fetch_client_well_known_with_url(homeserver.to_string()).await
2104 } else {
2105 response
2106 }
2107 }
2108
2109 async fn fetch_client_well_known_with_url(
2110 &self,
2111 url: String,
2112 ) -> Option<discover_homeserver::Response> {
2113 let well_known = self
2114 .inner
2115 .http_client
2116 .send(
2117 discover_homeserver::Request::new(),
2118 Some(RequestConfig::short_retry()),
2119 url,
2120 None,
2121 (),
2122 Default::default(),
2123 )
2124 .await;
2125
2126 match well_known {
2127 Ok(well_known) => Some(well_known),
2128 Err(http_error) => {
2129 // It is perfectly valid to not have a well-known file.
2130 // Maybe we should check for a specific error code to be sure?
2131 warn!("Failed to fetch client well-known: {http_error}");
2132 None
2133 }
2134 }
2135 }
2136
2137 /// Load supported versions from storage, or fetch them from network and
2138 /// cache them.
2139 ///
2140 /// If `failsafe` is true, this will try to minimize side effects to avoid
2141 /// possible deadlocks.
2142 async fn load_or_fetch_supported_versions(
2143 &self,
2144 failsafe: bool,
2145 ) -> HttpResult<SupportedVersionsResponse> {
2146 match self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await {
2147 Ok(Some(stored)) => {
2148 if let Some(supported_versions) =
2149 stored.into_supported_versions().and_then(|value| value.into_data())
2150 {
2151 return Ok(supported_versions);
2152 }
2153 }
2154 Ok(None) => {
2155 // fallthrough: cache is empty
2156 }
2157 Err(err) => {
2158 warn!("error when loading cached supported versions: {err}");
2159 // fallthrough to network.
2160 }
2161 }
2162
2163 let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2164 let supported_versions = SupportedVersionsResponse {
2165 versions: server_versions.versions,
2166 unstable_features: server_versions.unstable_features,
2167 };
2168
2169 // Only attempt to cache the result in storage if the request was authenticated.
2170 if self.auth_ctx().has_valid_access_token()
2171 && let Err(err) = self
2172 .state_store()
2173 .set_kv_data(
2174 StateStoreDataKey::SupportedVersions,
2175 StateStoreDataValue::SupportedVersions(TtlStoreValue::new(
2176 supported_versions.clone(),
2177 )),
2178 )
2179 .await
2180 {
2181 warn!("error when caching supported versions: {err}");
2182 }
2183
2184 Ok(supported_versions)
2185 }
2186
2187 pub(crate) async fn get_cached_supported_versions(&self) -> Option<SupportedVersions> {
2188 let supported_versions = &self.inner.caches.supported_versions;
2189
2190 if let CachedValue::Cached(val) = &*supported_versions.read().await {
2191 Some(val.clone())
2192 } else {
2193 None
2194 }
2195 }
2196
2197 /// Get the Matrix versions and features supported by the homeserver by
2198 /// fetching them from the server or the cache.
2199 ///
2200 /// This is equivalent to calling both [`Client::server_versions()`] and
2201 /// [`Client::unstable_features()`]. To always fetch the result from the
2202 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2203 /// and then `.as_supported_versions()` on the response.
2204 ///
2205 /// # Examples
2206 ///
2207 /// ```no_run
2208 /// use ruma::api::{FeatureFlag, MatrixVersion};
2209 /// # use matrix_sdk::{Client, config::SyncSettings};
2210 /// # use url::Url;
2211 /// # async {
2212 /// # let homeserver = Url::parse("http://localhost:8080")?;
2213 /// # let mut client = Client::new(homeserver).await?;
2214 ///
2215 /// let supported = client.supported_versions().await?;
2216 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2217 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2218 ///
2219 /// let msc_x_feature = FeatureFlag::from("msc_x");
2220 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2221 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2222 /// # anyhow::Ok(()) };
2223 /// ```
2224 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2225 self.supported_versions_inner(false).await
2226 }
2227
2228 /// Get the Matrix versions and features supported by the homeserver by
2229 /// fetching them from the server or the cache.
2230 ///
2231 /// If `failsafe` is true, this will try to minimize side effects to avoid
2232 /// possible deadlocks.
2233 pub(crate) async fn supported_versions_inner(
2234 &self,
2235 failsafe: bool,
2236 ) -> HttpResult<SupportedVersions> {
2237 let cached_supported_versions = &self.inner.caches.supported_versions;
2238 if let CachedValue::Cached(val) = &*cached_supported_versions.read().await {
2239 return Ok(val.clone());
2240 }
2241
2242 let mut guarded_supported_versions = cached_supported_versions.write().await;
2243 if let CachedValue::Cached(val) = &*guarded_supported_versions {
2244 return Ok(val.clone());
2245 }
2246
2247 let supported = self.load_or_fetch_supported_versions(failsafe).await?;
2248
2249 // Fill both unstable features and server versions at once.
2250 let mut supported_versions = supported.supported_versions();
2251 if supported_versions.versions.is_empty() {
2252 supported_versions.versions = [MatrixVersion::V1_0].into();
2253 }
2254
2255 // Only cache the result if the request was authenticated.
2256 if self.auth_ctx().has_valid_access_token() {
2257 *guarded_supported_versions = CachedValue::Cached(supported_versions.clone());
2258 }
2259
2260 Ok(supported_versions)
2261 }
2262
2263 /// Get the Matrix versions and features supported by the homeserver by
2264 /// fetching them from the cache.
2265 ///
2266 /// For a version of this function that fetches the supported versions and
2267 /// features from the homeserver if the [`SupportedVersions`] aren't
2268 /// found in the cache, take a look at the [`Client::server_versions()`]
2269 /// method.
2270 ///
2271 /// # Examples
2272 ///
2273 /// ```no_run
2274 /// use ruma::api::{FeatureFlag, MatrixVersion};
2275 /// # use matrix_sdk::{Client, config::SyncSettings};
2276 /// # use url::Url;
2277 /// # async {
2278 /// # let homeserver = Url::parse("http://localhost:8080")?;
2279 /// # let mut client = Client::new(homeserver).await?;
2280 ///
2281 /// let supported =
2282 /// if let Some(supported) = client.supported_versions_cached().await? {
2283 /// supported
2284 /// } else {
2285 /// client.fetch_server_versions(None).await?.as_supported_versions()
2286 /// };
2287 ///
2288 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2289 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2290 ///
2291 /// let msc_x_feature = FeatureFlag::from("msc_x");
2292 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2293 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2294 /// # anyhow::Ok(()) };
2295 /// ```
2296 pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2297 if let Some(cached) = self.get_cached_supported_versions().await {
2298 Ok(Some(cached))
2299 } else if let Some(stored) =
2300 self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await?
2301 {
2302 if let Some(supported) =
2303 stored.into_supported_versions().and_then(|value| value.into_data())
2304 {
2305 Ok(Some(supported.supported_versions()))
2306 } else {
2307 Ok(None)
2308 }
2309 } else {
2310 Ok(None)
2311 }
2312 }
2313
2314 /// Get the Matrix versions supported by the homeserver by fetching them
2315 /// from the server or the cache.
2316 ///
2317 /// # Examples
2318 ///
2319 /// ```no_run
2320 /// use ruma::api::MatrixVersion;
2321 /// # use matrix_sdk::{Client, config::SyncSettings};
2322 /// # use url::Url;
2323 /// # async {
2324 /// # let homeserver = Url::parse("http://localhost:8080")?;
2325 /// # let mut client = Client::new(homeserver).await?;
2326 ///
2327 /// let server_versions = client.server_versions().await?;
2328 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2329 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2330 /// # anyhow::Ok(()) };
2331 /// ```
2332 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2333 Ok(self.supported_versions().await?.versions)
2334 }
2335
2336 /// Get the unstable features supported by the homeserver by fetching them
2337 /// from the server or the cache.
2338 ///
2339 /// # Examples
2340 ///
2341 /// ```no_run
2342 /// use matrix_sdk::ruma::api::FeatureFlag;
2343 /// # use matrix_sdk::{Client, config::SyncSettings};
2344 /// # use url::Url;
2345 /// # async {
2346 /// # let homeserver = Url::parse("http://localhost:8080")?;
2347 /// # let mut client = Client::new(homeserver).await?;
2348 ///
2349 /// let msc_x_feature = FeatureFlag::from("msc_x");
2350 /// let unstable_features = client.unstable_features().await?;
2351 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2352 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2353 /// # anyhow::Ok(()) };
2354 /// ```
2355 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2356 Ok(self.supported_versions().await?.features)
2357 }
2358
2359 /// Empty the supported versions and unstable features cache.
2360 ///
2361 /// Since the SDK caches the supported versions, it's possible to have a
2362 /// stale entry in the cache. This functions makes it possible to force
2363 /// reset it.
2364 pub async fn reset_supported_versions(&self) -> Result<()> {
2365 // Empty the in-memory caches.
2366 self.inner.caches.supported_versions.write().await.take();
2367
2368 // Empty the store cache.
2369 Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2370 }
2371
2372 /// Load well-known from storage, or fetch it from network and cache it.
2373 async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2374 match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
2375 Ok(Some(stored)) => {
2376 if let Some(well_known) =
2377 stored.into_well_known().and_then(|value| value.into_data())
2378 {
2379 return Ok(well_known);
2380 }
2381 }
2382 Ok(None) => {
2383 // fallthrough: cache is empty
2384 }
2385 Err(err) => {
2386 warn!("error when loading cached well-known: {err}");
2387 // fallthrough to network.
2388 }
2389 }
2390
2391 let well_known = self.fetch_client_well_known().await.map(Into::into);
2392
2393 // Attempt to cache the result in storage.
2394 if let Err(err) = self
2395 .state_store()
2396 .set_kv_data(
2397 StateStoreDataKey::WellKnown,
2398 StateStoreDataValue::WellKnown(TtlStoreValue::new(well_known.clone())),
2399 )
2400 .await
2401 {
2402 warn!("error when caching well-known: {err}");
2403 }
2404
2405 Ok(well_known)
2406 }
2407
2408 async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2409 let well_known = &self.inner.caches.well_known;
2410 if let CachedValue::Cached(val) = &*well_known.read().await {
2411 return Ok(val.clone());
2412 }
2413
2414 let mut guarded_well_known = well_known.write().await;
2415 if let CachedValue::Cached(val) = &*guarded_well_known {
2416 return Ok(val.clone());
2417 }
2418
2419 let well_known = self.load_or_fetch_well_known().await?;
2420
2421 *guarded_well_known = CachedValue::Cached(well_known.clone());
2422
2423 Ok(well_known)
2424 }
2425
2426 /// Get information about the homeserver's advertised RTC foci by fetching
2427 /// the well-known file from the server or the cache.
2428 ///
2429 /// # Examples
2430 /// ```no_run
2431 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2432 /// # use url::Url;
2433 /// # async {
2434 /// # let homeserver = Url::parse("http://localhost:8080")?;
2435 /// # let mut client = Client::new(homeserver).await?;
2436 /// let rtc_foci = client.rtc_foci().await?;
2437 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2438 /// RtcFocusInfo::LiveKit(info) => Some(info),
2439 /// _ => None,
2440 /// });
2441 /// if let Some(info) = default_livekit_focus_info {
2442 /// println!("Default LiveKit service URL: {}", info.service_url);
2443 /// }
2444 /// # anyhow::Ok(()) };
2445 /// ```
2446 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2447 let well_known = self.get_or_load_and_cache_well_known().await?;
2448
2449 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2450 }
2451
2452 /// Empty the well-known cache.
2453 ///
2454 /// Since the SDK caches the well-known, it's possible to have a stale entry
2455 /// in the cache. This functions makes it possible to force reset it.
2456 pub async fn reset_well_known(&self) -> Result<()> {
2457 // Empty the in-memory caches.
2458 self.inner.caches.well_known.write().await.take();
2459
2460 // Empty the store cache.
2461 Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2462 }
2463
2464 /// Check whether MSC 4028 is enabled on the homeserver.
2465 ///
2466 /// # Examples
2467 ///
2468 /// ```no_run
2469 /// # use matrix_sdk::{Client, config::SyncSettings};
2470 /// # use url::Url;
2471 /// # async {
2472 /// # let homeserver = Url::parse("http://localhost:8080")?;
2473 /// # let mut client = Client::new(homeserver).await?;
2474 /// let msc4028_enabled =
2475 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2476 /// # anyhow::Ok(()) };
2477 /// ```
2478 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2479 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2480 }
2481
2482 /// Get information of all our own devices.
2483 ///
2484 /// # Examples
2485 ///
2486 /// ```no_run
2487 /// # use matrix_sdk::{Client, config::SyncSettings};
2488 /// # use url::Url;
2489 /// # async {
2490 /// # let homeserver = Url::parse("http://localhost:8080")?;
2491 /// # let mut client = Client::new(homeserver).await?;
2492 /// let response = client.devices().await?;
2493 ///
2494 /// for device in response.devices {
2495 /// println!(
2496 /// "Device: {} {}",
2497 /// device.device_id,
2498 /// device.display_name.as_deref().unwrap_or("")
2499 /// );
2500 /// }
2501 /// # anyhow::Ok(()) };
2502 /// ```
2503 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2504 let request = get_devices::v3::Request::new();
2505
2506 self.send(request).await
2507 }
2508
2509 /// Delete the given devices from the server.
2510 ///
2511 /// # Arguments
2512 ///
2513 /// * `devices` - The list of devices that should be deleted from the
2514 /// server.
2515 ///
2516 /// * `auth_data` - This request requires user interactive auth, the first
2517 /// request needs to set this to `None` and will always fail with an
2518 /// `UiaaResponse`. The response will contain information for the
2519 /// interactive auth and the same request needs to be made but this time
2520 /// with some `auth_data` provided.
2521 ///
2522 /// ```no_run
2523 /// # use matrix_sdk::{
2524 /// # ruma::{api::client::uiaa, owned_device_id},
2525 /// # Client, Error, config::SyncSettings,
2526 /// # };
2527 /// # use serde_json::json;
2528 /// # use url::Url;
2529 /// # use std::collections::BTreeMap;
2530 /// # async {
2531 /// # let homeserver = Url::parse("http://localhost:8080")?;
2532 /// # let mut client = Client::new(homeserver).await?;
2533 /// let devices = &[owned_device_id!("DEVICEID")];
2534 ///
2535 /// if let Err(e) = client.delete_devices(devices, None).await {
2536 /// if let Some(info) = e.as_uiaa_response() {
2537 /// let mut password = uiaa::Password::new(
2538 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2539 /// "wordpass".to_owned(),
2540 /// );
2541 /// password.session = info.session.clone();
2542 ///
2543 /// client
2544 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2545 /// .await?;
2546 /// }
2547 /// }
2548 /// # anyhow::Ok(()) };
2549 pub async fn delete_devices(
2550 &self,
2551 devices: &[OwnedDeviceId],
2552 auth_data: Option<uiaa::AuthData>,
2553 ) -> HttpResult<delete_devices::v3::Response> {
2554 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2555 request.auth = auth_data;
2556
2557 self.send(request).await
2558 }
2559
2560 /// Change the display name of a device owned by the current user.
2561 ///
2562 /// Returns a `update_device::Response` which specifies the result
2563 /// of the operation.
2564 ///
2565 /// # Arguments
2566 ///
2567 /// * `device_id` - The ID of the device to change the display name of.
2568 /// * `display_name` - The new display name to set.
2569 pub async fn rename_device(
2570 &self,
2571 device_id: &DeviceId,
2572 display_name: &str,
2573 ) -> HttpResult<update_device::v3::Response> {
2574 let mut request = update_device::v3::Request::new(device_id.to_owned());
2575 request.display_name = Some(display_name.to_owned());
2576
2577 self.send(request).await
2578 }
2579
2580 /// Check whether a device with a specific ID exists on the server.
2581 ///
2582 /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2583 /// with 404 and the underlying error otherwise.
2584 ///
2585 /// # Arguments
2586 ///
2587 /// * `device_id` - The ID of the device to query.
2588 pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2589 let request = device::get_device::v3::Request::new(device_id);
2590 match self.send(request).await {
2591 Ok(_) => Ok(true),
2592 Err(err) => {
2593 if let Some(error) = err.as_client_api_error()
2594 && error.status_code == 404
2595 {
2596 Ok(false)
2597 } else {
2598 Err(err.into())
2599 }
2600 }
2601 }
2602 }
2603
2604 /// Synchronize the client's state with the latest state on the server.
2605 ///
2606 /// ## Syncing Events
2607 ///
2608 /// Messages or any other type of event need to be periodically fetched from
2609 /// the server, this is achieved by sending a `/sync` request to the server.
2610 ///
2611 /// The first sync is sent out without a [`token`]. The response of the
2612 /// first sync will contain a [`next_batch`] field which should then be
2613 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2614 /// don't receive the same events multiple times.
2615 ///
2616 /// ## Long Polling
2617 ///
2618 /// A sync should in the usual case always be in flight. The
2619 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2620 /// long the server will wait for new events before it will respond.
2621 /// The server will respond immediately if some new events arrive before the
2622 /// timeout has expired. If no changes arrive and the timeout expires an
2623 /// empty sync response will be sent to the client.
2624 ///
2625 /// This method of sending a request that may not receive a response
2626 /// immediately is called long polling.
2627 ///
2628 /// ## Filtering Events
2629 ///
2630 /// The number or type of messages and events that the client should receive
2631 /// from the server can be altered using a [`Filter`].
2632 ///
2633 /// Filters can be non-trivial and, since they will be sent with every sync
2634 /// request, they may take up a bunch of unnecessary bandwidth.
2635 ///
2636 /// Luckily filters can be uploaded to the server and reused using an unique
2637 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2638 /// method.
2639 ///
2640 /// # Arguments
2641 ///
2642 /// * `sync_settings` - Settings for the sync call, this allows us to set
2643 /// various options to configure the sync:
2644 /// * [`filter`] - To configure which events we receive and which get
2645 /// [filtered] by the server
2646 /// * [`timeout`] - To configure our [long polling] setup.
2647 /// * [`token`] - To tell the server which events we already received
2648 /// and where we wish to continue syncing.
2649 /// * [`full_state`] - To tell the server that we wish to receive all
2650 /// state events, regardless of our configured [`token`].
2651 /// * [`set_presence`] - To tell the server to set the presence and to
2652 /// which state.
2653 ///
2654 /// # Examples
2655 ///
2656 /// ```no_run
2657 /// # use url::Url;
2658 /// # async {
2659 /// # let homeserver = Url::parse("http://localhost:8080")?;
2660 /// # let username = "";
2661 /// # let password = "";
2662 /// use matrix_sdk::{
2663 /// Client, config::SyncSettings,
2664 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2665 /// };
2666 ///
2667 /// let client = Client::new(homeserver).await?;
2668 /// client.matrix_auth().login_username(username, password).send().await?;
2669 ///
2670 /// // Sync once so we receive the client state and old messages.
2671 /// client.sync_once(SyncSettings::default()).await?;
2672 ///
2673 /// // Register our handler so we start responding once we receive a new
2674 /// // event.
2675 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2676 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2677 /// });
2678 ///
2679 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2680 /// // from our `sync_once()` call automatically.
2681 /// client.sync(SyncSettings::default()).await;
2682 /// # anyhow::Ok(()) };
2683 /// ```
2684 ///
2685 /// [`sync`]: #method.sync
2686 /// [`SyncSettings`]: crate::config::SyncSettings
2687 /// [`token`]: crate::config::SyncSettings#method.token
2688 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2689 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2690 /// [`set_presence`]: ruma::presence::PresenceState
2691 /// [`filter`]: crate::config::SyncSettings#method.filter
2692 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2693 /// [`next_batch`]: SyncResponse#structfield.next_batch
2694 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2695 /// [long polling]: #long-polling
2696 /// [filtered]: #filtering-events
2697 #[instrument(skip(self))]
2698 pub async fn sync_once(
2699 &self,
2700 sync_settings: crate::config::SyncSettings,
2701 ) -> Result<SyncResponse> {
2702 // The sync might not return for quite a while due to the timeout.
2703 // We'll see if there's anything crypto related to send out before we
2704 // sync, i.e. if we closed our client after a sync but before the
2705 // crypto requests were sent out.
2706 //
2707 // This will mostly be a no-op.
2708 #[cfg(feature = "e2e-encryption")]
2709 if let Err(e) = self.send_outgoing_requests().await {
2710 error!(error = ?e, "Error while sending outgoing E2EE requests");
2711 }
2712
2713 let token = match sync_settings.token {
2714 SyncToken::Specific(token) => Some(token),
2715 SyncToken::NoToken => None,
2716 SyncToken::ReusePrevious => self.sync_token().await,
2717 };
2718
2719 let request = assign!(sync_events::v3::Request::new(), {
2720 filter: sync_settings.filter.map(|f| *f),
2721 since: token,
2722 full_state: sync_settings.full_state,
2723 set_presence: sync_settings.set_presence,
2724 timeout: sync_settings.timeout,
2725 use_state_after: true,
2726 });
2727 let mut request_config = self.request_config();
2728 if let Some(timeout) = sync_settings.timeout {
2729 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2730 request_config.timeout = Some(base_timeout + timeout);
2731 }
2732
2733 let response = self.send(request).with_request_config(request_config).await?;
2734 let next_batch = response.next_batch.clone();
2735 let response = self.process_sync(response).await?;
2736
2737 #[cfg(feature = "e2e-encryption")]
2738 if let Err(e) = self.send_outgoing_requests().await {
2739 error!(error = ?e, "Error while sending outgoing E2EE requests");
2740 }
2741
2742 self.inner.sync_beat.notify(usize::MAX);
2743
2744 Ok(SyncResponse::new(next_batch, response))
2745 }
2746
2747 /// Repeatedly synchronize the client state with the server.
2748 ///
2749 /// This method will only return on error, if cancellation is needed
2750 /// the method should be wrapped in a cancelable task or the
2751 /// [`Client::sync_with_callback`] method can be used or
2752 /// [`Client::sync_with_result_callback`] if you want to handle error
2753 /// cases in the loop, too.
2754 ///
2755 /// This method will internally call [`Client::sync_once`] in a loop.
2756 ///
2757 /// This method can be used with the [`Client::add_event_handler`]
2758 /// method to react to individual events. If you instead wish to handle
2759 /// events in a bulk manner the [`Client::sync_with_callback`],
2760 /// [`Client::sync_with_result_callback`] and
2761 /// [`Client::sync_stream`] methods can be used instead. Those methods
2762 /// repeatedly return the whole sync response.
2763 ///
2764 /// # Arguments
2765 ///
2766 /// * `sync_settings` - Settings for the sync call. *Note* that those
2767 /// settings will be only used for the first sync call. See the argument
2768 /// docs for [`Client::sync_once`] for more info.
2769 ///
2770 /// # Return
2771 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2772 /// up to the user of the API to check the error and decide whether the sync
2773 /// should continue or not.
2774 ///
2775 /// # Examples
2776 ///
2777 /// ```no_run
2778 /// # use url::Url;
2779 /// # async {
2780 /// # let homeserver = Url::parse("http://localhost:8080")?;
2781 /// # let username = "";
2782 /// # let password = "";
2783 /// use matrix_sdk::{
2784 /// Client, config::SyncSettings,
2785 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2786 /// };
2787 ///
2788 /// let client = Client::new(homeserver).await?;
2789 /// client.matrix_auth().login_username(&username, &password).send().await?;
2790 ///
2791 /// // Register our handler so we start responding once we receive a new
2792 /// // event.
2793 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2794 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2795 /// });
2796 ///
2797 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2798 /// // automatically.
2799 /// client.sync(SyncSettings::default()).await?;
2800 /// # anyhow::Ok(()) };
2801 /// ```
2802 ///
2803 /// [argument docs]: #method.sync_once
2804 /// [`sync_with_callback`]: #method.sync_with_callback
2805 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2806 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2807 }
2808
2809 /// Repeatedly call sync to synchronize the client state with the server.
2810 ///
2811 /// # Arguments
2812 ///
2813 /// * `sync_settings` - Settings for the sync call. *Note* that those
2814 /// settings will be only used for the first sync call. See the argument
2815 /// docs for [`Client::sync_once`] for more info.
2816 ///
2817 /// * `callback` - A callback that will be called every time a successful
2818 /// response has been fetched from the server. The callback must return a
2819 /// boolean which signalizes if the method should stop syncing. If the
2820 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2821 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2822 ///
2823 /// # Return
2824 /// The sync runs until an error occurs or the
2825 /// callback indicates that the Loop should stop. If the callback asked for
2826 /// a regular stop, the result will be `Ok(())` otherwise the
2827 /// `Err(Error)` is returned.
2828 ///
2829 /// # Examples
2830 ///
2831 /// The following example demonstrates how to sync forever while sending all
2832 /// the interesting events through a mpsc channel to another thread e.g. a
2833 /// UI thread.
2834 ///
2835 /// ```no_run
2836 /// # use std::time::Duration;
2837 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2838 /// # use url::Url;
2839 /// # async {
2840 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2841 /// # let mut client = Client::new(homeserver).await.unwrap();
2842 ///
2843 /// use tokio::sync::mpsc::channel;
2844 ///
2845 /// let (tx, rx) = channel(100);
2846 ///
2847 /// let sync_channel = &tx;
2848 /// let sync_settings = SyncSettings::new()
2849 /// .timeout(Duration::from_secs(30));
2850 ///
2851 /// client
2852 /// .sync_with_callback(sync_settings, |response| async move {
2853 /// let channel = sync_channel;
2854 /// for (room_id, room) in response.rooms.joined {
2855 /// for event in room.timeline.events {
2856 /// channel.send(event).await.unwrap();
2857 /// }
2858 /// }
2859 ///
2860 /// LoopCtrl::Continue
2861 /// })
2862 /// .await;
2863 /// };
2864 /// ```
2865 #[instrument(skip_all)]
2866 pub async fn sync_with_callback<C>(
2867 &self,
2868 sync_settings: crate::config::SyncSettings,
2869 callback: impl Fn(SyncResponse) -> C,
2870 ) -> Result<(), Error>
2871 where
2872 C: Future<Output = LoopCtrl>,
2873 {
2874 self.sync_with_result_callback(sync_settings, |result| async {
2875 Ok(callback(result?).await)
2876 })
2877 .await
2878 }
2879
2880 /// Repeatedly call sync to synchronize the client state with the server.
2881 ///
2882 /// # Arguments
2883 ///
2884 /// * `sync_settings` - Settings for the sync call. *Note* that those
2885 /// settings will be only used for the first sync call. See the argument
2886 /// docs for [`Client::sync_once`] for more info.
2887 ///
2888 /// * `callback` - A callback that will be called every time after a
2889 /// response has been received, failure or not. The callback returns a
2890 /// `Result<LoopCtrl, Error>`, too. When returning
2891 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2892 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2893 /// function returns `Ok(())`. In case the callback can't handle the
2894 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2895 /// which results in the sync ending and the `Err(Error)` being returned.
2896 ///
2897 /// # Return
2898 /// The sync runs until an error occurs that the callback can't handle or
2899 /// the callback indicates that the Loop should stop. If the callback
2900 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2901 /// `Err(Error)` is returned.
2902 ///
2903 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2904 /// this, and are handled first without sending the result to the
2905 /// callback. Only after they have exceeded is the `Result` handed to
2906 /// the callback.
2907 ///
2908 /// # Examples
2909 ///
2910 /// The following example demonstrates how to sync forever while sending all
2911 /// the interesting events through a mpsc channel to another thread e.g. a
2912 /// UI thread.
2913 ///
2914 /// ```no_run
2915 /// # use std::time::Duration;
2916 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2917 /// # use url::Url;
2918 /// # async {
2919 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2920 /// # let mut client = Client::new(homeserver).await.unwrap();
2921 /// #
2922 /// use tokio::sync::mpsc::channel;
2923 ///
2924 /// let (tx, rx) = channel(100);
2925 ///
2926 /// let sync_channel = &tx;
2927 /// let sync_settings = SyncSettings::new()
2928 /// .timeout(Duration::from_secs(30));
2929 ///
2930 /// client
2931 /// .sync_with_result_callback(sync_settings, |response| async move {
2932 /// let channel = sync_channel;
2933 /// let sync_response = response?;
2934 /// for (room_id, room) in sync_response.rooms.joined {
2935 /// for event in room.timeline.events {
2936 /// channel.send(event).await.unwrap();
2937 /// }
2938 /// }
2939 ///
2940 /// Ok(LoopCtrl::Continue)
2941 /// })
2942 /// .await;
2943 /// };
2944 /// ```
2945 #[instrument(skip(self, callback))]
2946 pub async fn sync_with_result_callback<C>(
2947 &self,
2948 sync_settings: crate::config::SyncSettings,
2949 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2950 ) -> Result<(), Error>
2951 where
2952 C: Future<Output = Result<LoopCtrl, Error>>,
2953 {
2954 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2955
2956 while let Some(result) = sync_stream.next().await {
2957 trace!("Running callback");
2958 if callback(result).await? == LoopCtrl::Break {
2959 trace!("Callback told us to stop");
2960 break;
2961 }
2962 trace!("Done running callback");
2963 }
2964
2965 Ok(())
2966 }
2967
2968 //// Repeatedly synchronize the client state with the server.
2969 ///
2970 /// This method will internally call [`Client::sync_once`] in a loop and is
2971 /// equivalent to the [`Client::sync`] method but the responses are provided
2972 /// as an async stream.
2973 ///
2974 /// # Arguments
2975 ///
2976 /// * `sync_settings` - Settings for the sync call. *Note* that those
2977 /// settings will be only used for the first sync call. See the argument
2978 /// docs for [`Client::sync_once`] for more info.
2979 ///
2980 /// # Examples
2981 ///
2982 /// ```no_run
2983 /// # use url::Url;
2984 /// # async {
2985 /// # let homeserver = Url::parse("http://localhost:8080")?;
2986 /// # let username = "";
2987 /// # let password = "";
2988 /// use futures_util::StreamExt;
2989 /// use matrix_sdk::{Client, config::SyncSettings};
2990 ///
2991 /// let client = Client::new(homeserver).await?;
2992 /// client.matrix_auth().login_username(&username, &password).send().await?;
2993 ///
2994 /// let mut sync_stream =
2995 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2996 ///
2997 /// while let Some(Ok(response)) = sync_stream.next().await {
2998 /// for room in response.rooms.joined.values() {
2999 /// for e in &room.timeline.events {
3000 /// if let Ok(event) = e.raw().deserialize() {
3001 /// println!("Received event {:?}", event);
3002 /// }
3003 /// }
3004 /// }
3005 /// }
3006 ///
3007 /// # anyhow::Ok(()) };
3008 /// ```
3009 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3010 #[instrument(skip(self))]
3011 pub async fn sync_stream(
3012 &self,
3013 mut sync_settings: crate::config::SyncSettings,
3014 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3015 let mut is_first_sync = true;
3016 let mut timeout = None;
3017 let mut last_sync_time: Option<Instant> = None;
3018
3019 let parent_span = Span::current();
3020
3021 async_stream::stream!({
3022 loop {
3023 trace!("Syncing");
3024
3025 if sync_settings.ignore_timeout_on_first_sync {
3026 if is_first_sync {
3027 timeout = sync_settings.timeout.take();
3028 } else if sync_settings.timeout.is_none() && timeout.is_some() {
3029 sync_settings.timeout = timeout.take();
3030 }
3031
3032 is_first_sync = false;
3033 }
3034
3035 yield self
3036 .sync_loop_helper(&mut sync_settings)
3037 .instrument(parent_span.clone())
3038 .await;
3039
3040 Client::delay_sync(&mut last_sync_time).await
3041 }
3042 })
3043 }
3044
3045 /// Get the current, if any, sync token of the client.
3046 /// This will be None if the client didn't sync at least once.
3047 pub(crate) async fn sync_token(&self) -> Option<String> {
3048 self.inner.base_client.sync_token().await
3049 }
3050
3051 /// Gets information about the owner of a given access token.
3052 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3053 let request = whoami::v3::Request::new();
3054 self.send(request).await
3055 }
3056
3057 /// Subscribes a new receiver to client SessionChange broadcasts.
3058 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3059 let broadcast = &self.auth_ctx().session_change_sender;
3060 broadcast.subscribe()
3061 }
3062
3063 /// Sets the save/restore session callbacks.
3064 ///
3065 /// This is another mechanism to get synchronous updates to session tokens,
3066 /// while [`Self::subscribe_to_session_changes`] provides an async update.
3067 pub fn set_session_callbacks(
3068 &self,
3069 reload_session_callback: Box<ReloadSessionCallback>,
3070 save_session_callback: Box<SaveSessionCallback>,
3071 ) -> Result<()> {
3072 self.inner
3073 .auth_ctx
3074 .reload_session_callback
3075 .set(reload_session_callback)
3076 .map_err(|_| Error::MultipleSessionCallbacks)?;
3077
3078 self.inner
3079 .auth_ctx
3080 .save_session_callback
3081 .set(save_session_callback)
3082 .map_err(|_| Error::MultipleSessionCallbacks)?;
3083
3084 Ok(())
3085 }
3086
3087 /// Get the notification settings of the current owner of the client.
3088 pub async fn notification_settings(&self) -> NotificationSettings {
3089 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3090 NotificationSettings::new(self.clone(), ruleset)
3091 }
3092
3093 /// Create a new specialized `Client` that can process notifications.
3094 ///
3095 /// See [`CrossProcessLock::new`] to learn more about
3096 /// `cross_process_lock_config`.
3097 ///
3098 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3099 pub async fn notification_client(
3100 &self,
3101 cross_process_lock_config: CrossProcessLockConfig,
3102 ) -> Result<Client> {
3103 let client = Client {
3104 inner: ClientInner::new(
3105 self.inner.auth_ctx.clone(),
3106 self.server().cloned(),
3107 self.homeserver(),
3108 self.sliding_sync_version(),
3109 self.inner.http_client.clone(),
3110 self.inner
3111 .base_client
3112 .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3113 .await?,
3114 self.inner.caches.supported_versions.read().await.clone(),
3115 self.inner.caches.well_known.read().await.clone(),
3116 self.inner.respect_login_well_known,
3117 self.inner.event_cache.clone(),
3118 self.inner.send_queue_data.clone(),
3119 self.inner.latest_events.clone(),
3120 #[cfg(feature = "e2e-encryption")]
3121 self.inner.e2ee.encryption_settings,
3122 #[cfg(feature = "e2e-encryption")]
3123 self.inner.enable_share_history_on_invite,
3124 cross_process_lock_config,
3125 #[cfg(feature = "experimental-search")]
3126 self.inner.search_index.clone(),
3127 self.inner.thread_subscription_catchup.clone(),
3128 )
3129 .await,
3130 };
3131
3132 Ok(client)
3133 }
3134
3135 /// The [`EventCache`] instance for this [`Client`].
3136 pub fn event_cache(&self) -> &EventCache {
3137 // SAFETY: always initialized in the `Client` ctor.
3138 self.inner.event_cache.get().unwrap()
3139 }
3140
3141 /// The [`LatestEvents`] instance for this [`Client`].
3142 pub async fn latest_events(&self) -> &LatestEvents {
3143 self.inner
3144 .latest_events
3145 .get_or_init(|| async {
3146 LatestEvents::new(
3147 WeakClient::from_client(self),
3148 self.event_cache().clone(),
3149 SendQueue::new(self.clone()),
3150 self.room_info_notable_update_receiver(),
3151 )
3152 })
3153 .await
3154 }
3155
3156 /// Waits until an at least partially synced room is received, and returns
3157 /// it.
3158 ///
3159 /// **Note: this function will loop endlessly until either it finds the room
3160 /// or an externally set timeout happens.**
3161 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3162 loop {
3163 if let Some(room) = self.get_room(room_id) {
3164 if room.is_state_partially_or_fully_synced() {
3165 debug!("Found just created room!");
3166 return room;
3167 }
3168 debug!("Room wasn't partially synced, waiting for sync beat to try again");
3169 } else {
3170 debug!("Room wasn't found, waiting for sync beat to try again");
3171 }
3172 self.inner.sync_beat.listen().await;
3173 }
3174 }
3175
3176 /// Knock on a room given its `room_id_or_alias` to ask for permission to
3177 /// join it.
3178 pub async fn knock(
3179 &self,
3180 room_id_or_alias: OwnedRoomOrAliasId,
3181 reason: Option<String>,
3182 server_names: Vec<OwnedServerName>,
3183 ) -> Result<Room> {
3184 let request =
3185 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3186 let response = self.send(request).await?;
3187 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3188 Ok(Room::new(self.clone(), base_room))
3189 }
3190
3191 /// Checks whether the provided `user_id` belongs to an ignored user.
3192 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3193 self.base_client().is_user_ignored(user_id).await
3194 }
3195
3196 /// Gets the `max_upload_size` value from the homeserver, getting either a
3197 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3198 /// missing.
3199 ///
3200 /// Check the spec for more info:
3201 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3202 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3203 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3204 if let Some(data) = max_upload_size_lock.get() {
3205 return Ok(data.to_owned());
3206 }
3207
3208 // Use the authenticated endpoint when the server supports it.
3209 let supported_versions = self.supported_versions().await?;
3210 let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3211 .is_supported(&supported_versions);
3212
3213 let upload_size = if use_auth {
3214 self.send(authenticated_media::get_media_config::v1::Request::default())
3215 .await?
3216 .upload_size
3217 } else {
3218 #[allow(deprecated)]
3219 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3220 };
3221
3222 match max_upload_size_lock.set(upload_size) {
3223 Ok(_) => Ok(upload_size),
3224 Err(error) => {
3225 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3226 }
3227 }
3228 }
3229
3230 /// The settings to use for decrypting events.
3231 #[cfg(feature = "e2e-encryption")]
3232 pub fn decryption_settings(&self) -> &DecryptionSettings {
3233 &self.base_client().decryption_settings
3234 }
3235
3236 /// Returns the [`SearchIndex`] for this [`Client`].
3237 #[cfg(feature = "experimental-search")]
3238 pub fn search_index(&self) -> &SearchIndex {
3239 &self.inner.search_index
3240 }
3241
3242 /// Whether the client is configured to take thread subscriptions (MSC4306
3243 /// and MSC4308) into account, and the server enabled the experimental
3244 /// feature flag for it.
3245 ///
3246 /// This may cause filtering out of thread subscriptions, and loading the
3247 /// thread subscriptions via the sliding sync extension, when the room
3248 /// list service is being used.
3249 ///
3250 /// This is async and fallible as it may use the network to retrieve the
3251 /// server supported features, if they aren't cached already.
3252 pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3253 // Check if the client is configured to support thread subscriptions first.
3254 match self.base_client().threading_support {
3255 ThreadingSupport::Enabled { with_subscriptions: false }
3256 | ThreadingSupport::Disabled => return Ok(false),
3257 ThreadingSupport::Enabled { with_subscriptions: true } => {}
3258 }
3259
3260 // Now, let's check that the server supports it.
3261 let server_enabled = self
3262 .supported_versions()
3263 .await?
3264 .features
3265 .contains(&FeatureFlag::from("org.matrix.msc4306"));
3266
3267 Ok(server_enabled)
3268 }
3269
3270 /// Fetch thread subscriptions changes between `from` and up to `to`.
3271 ///
3272 /// The `limit` optional parameter can be used to limit the number of
3273 /// entries in a response. It can also be overridden by the server, if
3274 /// it's deemed too large.
3275 pub async fn fetch_thread_subscriptions(
3276 &self,
3277 from: Option<String>,
3278 to: Option<String>,
3279 limit: Option<UInt>,
3280 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3281 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3282 from,
3283 to,
3284 limit,
3285 });
3286 Ok(self.send(request).await?)
3287 }
3288
3289 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3290 self.inner.thread_subscription_catchup.get().unwrap()
3291 }
3292
3293 /// Perform database optimizations if any are available, i.e. vacuuming in
3294 /// SQLite.
3295 ///
3296 /// **Warning:** this was added to check if SQLite fragmentation was the
3297 /// source of performance issues, **DO NOT use in production**.
3298 #[doc(hidden)]
3299 pub async fn optimize_stores(&self) -> Result<()> {
3300 trace!("Optimizing state store...");
3301 self.state_store().optimize().await?;
3302
3303 trace!("Optimizing event cache store...");
3304 if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3305 clean_lock.optimize().await?;
3306 }
3307
3308 trace!("Optimizing media store...");
3309 self.media_store().lock().await?.optimize().await?;
3310
3311 Ok(())
3312 }
3313
3314 /// Returns the sizes of the existing stores, if known.
3315 pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3316 #[cfg(feature = "e2e-encryption")]
3317 let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3318 && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3319 {
3320 Some(store_size)
3321 } else {
3322 None
3323 };
3324 #[cfg(not(feature = "e2e-encryption"))]
3325 let crypto_store_size = None;
3326
3327 let state_store_size = self.state_store().get_size().await.ok().flatten();
3328
3329 let event_cache_store_size = if let Some(clean_lock) =
3330 self.event_cache_store().lock().await?.as_clean()
3331 && let Ok(Some(store_size)) = clean_lock.get_size().await
3332 {
3333 Some(store_size)
3334 } else {
3335 None
3336 };
3337
3338 let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3339
3340 Ok(StoreSizes {
3341 crypto_store: crypto_store_size,
3342 state_store: state_store_size,
3343 event_cache_store: event_cache_store_size,
3344 media_store: media_store_size,
3345 })
3346 }
3347
3348 /// Get a reference to the client's task monitor, for spawning background
3349 /// tasks.
3350 pub fn task_monitor(&self) -> &TaskMonitor {
3351 &self.inner.task_monitor
3352 }
3353
3354 /// Add a subscriber for duplicate key upload error notifications triggered
3355 /// by requests to /keys/upload.
3356 #[cfg(feature = "e2e-encryption")]
3357 pub fn subscribe_to_duplicate_key_upload_errors(
3358 &self,
3359 ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3360 self.inner.duplicate_key_upload_error_sender.subscribe()
3361 }
3362
3363 /// Check the record of whether we are waiting for an [MSC4268] key bundle
3364 /// for the given room.
3365 ///
3366 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3367 #[cfg(feature = "e2e-encryption")]
3368 pub async fn get_pending_key_bundle_details_for_room(
3369 &self,
3370 room_id: &RoomId,
3371 ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3372 Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3373 }
3374}
3375
3376/// Contains the disk size of the different stores, if known. It won't be
3377/// available for in-memory stores.
3378#[derive(Debug, Clone)]
3379pub struct StoreSizes {
3380 /// The size of the CryptoStore.
3381 pub crypto_store: Option<usize>,
3382 /// The size of the StateStore.
3383 pub state_store: Option<usize>,
3384 /// The size of the EventCacheStore.
3385 pub event_cache_store: Option<usize>,
3386 /// The size of the MediaStore.
3387 pub media_store: Option<usize>,
3388}
3389
3390#[cfg(any(feature = "testing", test))]
3391impl Client {
3392 /// Test helper to mark users as tracked by the crypto layer.
3393 #[cfg(feature = "e2e-encryption")]
3394 pub async fn update_tracked_users_for_testing(
3395 &self,
3396 user_ids: impl IntoIterator<Item = &UserId>,
3397 ) {
3398 let olm = self.olm_machine().await;
3399 let olm = olm.as_ref().unwrap();
3400 olm.update_tracked_users(user_ids).await.unwrap();
3401 }
3402}
3403
3404/// A weak reference to the inner client, useful when trying to get a handle
3405/// on the owning client.
3406#[derive(Clone, Debug)]
3407pub(crate) struct WeakClient {
3408 client: Weak<ClientInner>,
3409}
3410
3411impl WeakClient {
3412 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3413 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3414 Self { client: Arc::downgrade(client) }
3415 }
3416
3417 /// Construct a [`WeakClient`] from a [`Client`].
3418 pub fn from_client(client: &Client) -> Self {
3419 Self::from_inner(&client.inner)
3420 }
3421
3422 /// Attempts to get a [`Client`] from this [`WeakClient`].
3423 pub fn get(&self) -> Option<Client> {
3424 self.client.upgrade().map(|inner| Client { inner })
3425 }
3426
3427 /// Gets the number of strong (`Arc`) pointers still pointing to this
3428 /// client.
3429 #[allow(dead_code)]
3430 pub fn strong_count(&self) -> usize {
3431 self.client.strong_count()
3432 }
3433}
3434
3435/// Information about the state of a room before we joined it.
3436#[derive(Debug, Clone, Default)]
3437struct PreJoinRoomInfo {
3438 /// The user who invited us to the room, if any.
3439 pub inviter: Option<RoomMember>,
3440}
3441
3442// The http mocking library is not supported for wasm32
3443#[cfg(all(test, not(target_family = "wasm")))]
3444pub(crate) mod tests {
3445 use std::{sync::Arc, time::Duration};
3446
3447 use assert_matches::assert_matches;
3448 use assert_matches2::assert_let;
3449 use eyeball::SharedObservable;
3450 use futures_util::{FutureExt, StreamExt, pin_mut};
3451 use js_int::{UInt, uint};
3452 use matrix_sdk_base::{
3453 RoomState,
3454 store::{MemoryStore, StoreConfig},
3455 };
3456 use matrix_sdk_test::{
3457 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3458 event_factory::EventFactory,
3459 };
3460 #[cfg(target_family = "wasm")]
3461 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3462
3463 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3464 use ruma::{
3465 RoomId, ServerName, UserId,
3466 api::{
3467 FeatureFlag, MatrixVersion,
3468 client::{
3469 discovery::discover_homeserver::RtcFocusInfo,
3470 room::create_room::v3::Request as CreateRoomRequest,
3471 },
3472 },
3473 assign,
3474 events::{
3475 ignored_user_list::IgnoredUserListEventContent,
3476 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3477 },
3478 owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3479 };
3480 use serde_json::json;
3481 use stream_assert::{assert_next_matches, assert_pending};
3482 use tokio::{
3483 spawn,
3484 time::{sleep, timeout},
3485 };
3486 use url::Url;
3487
3488 use super::Client;
3489 use crate::{
3490 Error, TransmissionProgress,
3491 client::{WeakClient, futures::SendMediaUploadRequest},
3492 config::{RequestConfig, SyncSettings},
3493 futures::SendRequest,
3494 media::MediaError,
3495 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3496 };
3497
3498 #[async_test]
3499 async fn test_account_data() {
3500 let server = MatrixMockServer::new().await;
3501 let client = server.client_builder().build().await;
3502
3503 let f = EventFactory::new();
3504 server
3505 .mock_sync()
3506 .ok_and_run(&client, |builder| {
3507 builder.add_global_account_data(
3508 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3509 );
3510 })
3511 .await;
3512
3513 let content = client
3514 .account()
3515 .account_data::<IgnoredUserListEventContent>()
3516 .await
3517 .unwrap()
3518 .unwrap()
3519 .deserialize()
3520 .unwrap();
3521
3522 assert_eq!(content.ignored_users.len(), 1);
3523 }
3524
3525 #[async_test]
3526 async fn test_successful_discovery() {
3527 // Imagine this is `matrix.org`.
3528 let server = MatrixMockServer::new().await;
3529 let server_url = server.uri();
3530
3531 // Imagine this is `matrix-client.matrix.org`.
3532 let homeserver = MatrixMockServer::new().await;
3533 let homeserver_url = homeserver.uri();
3534
3535 // Imagine Alice has the user ID `@alice:matrix.org`.
3536 let domain = server_url.strip_prefix("http://").unwrap();
3537 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3538
3539 // The `.well-known` is on the server (e.g. `matrix.org`).
3540 server
3541 .mock_well_known()
3542 .ok_with_homeserver_url(&homeserver_url)
3543 .mock_once()
3544 .named("well-known")
3545 .mount()
3546 .await;
3547
3548 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3549 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3550
3551 let client = Client::builder()
3552 .insecure_server_name_no_tls(alice.server_name())
3553 .build()
3554 .await
3555 .unwrap();
3556
3557 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3558 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3559 client.server_versions().await.unwrap();
3560 }
3561
3562 #[async_test]
3563 async fn test_discovery_broken_server() {
3564 let server = MatrixMockServer::new().await;
3565 let server_url = server.uri();
3566 let domain = server_url.strip_prefix("http://").unwrap();
3567 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3568
3569 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3570
3571 assert!(
3572 Client::builder()
3573 .insecure_server_name_no_tls(alice.server_name())
3574 .build()
3575 .await
3576 .is_err(),
3577 "Creating a client from a user ID should fail when the .well-known request fails."
3578 );
3579 }
3580
3581 #[async_test]
3582 async fn test_room_creation() {
3583 let server = MatrixMockServer::new().await;
3584 let client = server.client_builder().build().await;
3585
3586 let f = EventFactory::new().sender(user_id!("@example:localhost"));
3587 server
3588 .mock_sync()
3589 .ok_and_run(&client, |builder| {
3590 builder.add_joined_room(
3591 JoinedRoomBuilder::default()
3592 .add_state_event(
3593 f.member(user_id!("@example:localhost")).display_name("example"),
3594 )
3595 .add_state_event(f.default_power_levels()),
3596 );
3597 })
3598 .await;
3599
3600 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3601 assert_eq!(room.state(), RoomState::Joined);
3602 }
3603
3604 #[async_test]
3605 async fn test_retry_limit_http_requests() {
3606 let server = MatrixMockServer::new().await;
3607 let client = server
3608 .client_builder()
3609 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3610 .build()
3611 .await;
3612
3613 assert!(client.request_config().retry_limit.unwrap() == 4);
3614
3615 server.mock_who_am_i().error500().expect(4).mount().await;
3616
3617 client.whoami().await.unwrap_err();
3618 }
3619
3620 #[async_test]
3621 async fn test_retry_timeout_http_requests() {
3622 // Keep this timeout small so that the test doesn't take long
3623 let retry_timeout = Duration::from_secs(5);
3624 let server = MatrixMockServer::new().await;
3625 let client = server
3626 .client_builder()
3627 .on_builder(|builder| {
3628 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3629 })
3630 .build()
3631 .await;
3632
3633 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3634
3635 server.mock_login().error500().expect(2..).mount().await;
3636
3637 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3638 }
3639
3640 #[async_test]
3641 async fn test_short_retry_initial_http_requests() {
3642 let server = MatrixMockServer::new().await;
3643 let client = server
3644 .client_builder()
3645 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3646 .build()
3647 .await;
3648
3649 server.mock_login().error500().expect(3..).mount().await;
3650
3651 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3652 }
3653
3654 #[async_test]
3655 async fn test_no_retry_http_requests() {
3656 let server = MatrixMockServer::new().await;
3657 let client = server.client_builder().build().await;
3658
3659 server.mock_devices().error500().mock_once().mount().await;
3660
3661 client.devices().await.unwrap_err();
3662 }
3663
3664 #[async_test]
3665 async fn test_set_homeserver() {
3666 let client = MockClientBuilder::new(None).build().await;
3667 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3668
3669 let homeserver = Url::parse("http://example.com/").unwrap();
3670 client.set_homeserver(homeserver.clone());
3671 assert_eq!(client.homeserver(), homeserver);
3672 }
3673
3674 #[async_test]
3675 async fn test_search_user_request() {
3676 let server = MatrixMockServer::new().await;
3677 let client = server.client_builder().build().await;
3678
3679 server.mock_user_directory().ok().mock_once().mount().await;
3680
3681 let response = client.search_users("test", 50).await.unwrap();
3682 assert_eq!(response.results.len(), 1);
3683 let result = &response.results[0];
3684 assert_eq!(result.user_id.to_string(), "@test:example.me");
3685 assert_eq!(result.display_name.clone().unwrap(), "Test");
3686 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3687 assert!(!response.limited);
3688 }
3689
3690 #[async_test]
3691 async fn test_request_unstable_features() {
3692 let server = MatrixMockServer::new().await;
3693 let client = server.client_builder().no_server_versions().build().await;
3694
3695 server
3696 .mock_versions()
3697 .with_feature("org.matrix.e2e_cross_signing", true)
3698 .ok()
3699 .mock_once()
3700 .mount()
3701 .await;
3702
3703 let unstable_features = client.unstable_features().await.unwrap();
3704 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3705 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3706 }
3707
3708 #[async_test]
3709 async fn test_can_homeserver_push_encrypted_event_to_device() {
3710 let server = MatrixMockServer::new().await;
3711 let client = server.client_builder().no_server_versions().build().await;
3712
3713 server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3714
3715 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3716 assert!(msc4028_enabled);
3717 }
3718
3719 #[async_test]
3720 async fn test_recently_visited_rooms() {
3721 // Tracking recently visited rooms requires authentication
3722 let client = MockClientBuilder::new(None).unlogged().build().await;
3723 assert_matches!(
3724 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3725 Err(Error::AuthenticationRequired)
3726 );
3727
3728 let client = MockClientBuilder::new(None).build().await;
3729 let account = client.account();
3730
3731 // We should start off with an empty list
3732 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3733
3734 // Tracking a valid room id should add it to the list
3735 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3736 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3737 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3738
3739 // And the existing list shouldn't be changed
3740 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3741 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3742
3743 // Tracking the same room again shouldn't change the list
3744 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3745 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3746 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3747
3748 // Tracking a second room should add it to the front of the list
3749 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3750 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3751 assert_eq!(
3752 account.get_recently_visited_rooms().await.unwrap(),
3753 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3754 );
3755
3756 // Tracking the first room yet again should move it to the front of the list
3757 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3758 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3759 assert_eq!(
3760 account.get_recently_visited_rooms().await.unwrap(),
3761 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3762 );
3763
3764 // Tracking should be capped at 20
3765 for n in 0..20 {
3766 account
3767 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3768 .await
3769 .unwrap();
3770 }
3771
3772 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3773
3774 // And the initial rooms should've been pushed out
3775 let rooms = account.get_recently_visited_rooms().await.unwrap();
3776 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3777 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3778
3779 // And the last tracked room should be the first
3780 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3781 }
3782
3783 #[async_test]
3784 async fn test_client_no_cycle_with_event_cache() {
3785 let client = MockClientBuilder::new(None).build().await;
3786
3787 // Wait for the init tasks to die.
3788 sleep(Duration::from_secs(1)).await;
3789
3790 let weak_client = WeakClient::from_client(&client);
3791 assert_eq!(weak_client.strong_count(), 1);
3792
3793 {
3794 let room_id = room_id!("!room:example.org");
3795
3796 // Have the client know the room.
3797 let response = SyncResponseBuilder::default()
3798 .add_joined_room(JoinedRoomBuilder::new(room_id))
3799 .build_sync_response();
3800 client.inner.base_client.receive_sync_response(response).await.unwrap();
3801
3802 client.event_cache().subscribe().unwrap();
3803
3804 let (_room_event_cache, _drop_handles) =
3805 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3806 }
3807
3808 drop(client);
3809
3810 // Give a bit of time for background tasks to die.
3811 sleep(Duration::from_secs(1)).await;
3812
3813 // The weak client must be the last reference to the client now.
3814 assert_eq!(weak_client.strong_count(), 0);
3815 let client = weak_client.get();
3816 assert!(
3817 client.is_none(),
3818 "too many strong references to the client: {}",
3819 Arc::strong_count(&client.unwrap().inner)
3820 );
3821 }
3822
3823 #[async_test]
3824 async fn test_supported_versions_caching() {
3825 let server = MatrixMockServer::new().await;
3826
3827 let versions_mock = server
3828 .mock_versions()
3829 .expect_default_access_token()
3830 .with_feature("org.matrix.e2e_cross_signing", true)
3831 .ok()
3832 .named("first versions mock")
3833 .expect(1)
3834 .mount_as_scoped()
3835 .await;
3836
3837 let memory_store = Arc::new(MemoryStore::new());
3838 let client = server
3839 .client_builder()
3840 .no_server_versions()
3841 .on_builder(|builder| {
3842 builder.store_config(
3843 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3844 .state_store(memory_store.clone()),
3845 )
3846 })
3847 .build()
3848 .await;
3849
3850 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3851
3852 // The result was cached.
3853 assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3854 // This subsequent call hits the in-memory cache.
3855 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3856
3857 drop(client);
3858
3859 let client = server
3860 .client_builder()
3861 .no_server_versions()
3862 .on_builder(|builder| {
3863 builder.store_config(
3864 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3865 .state_store(memory_store.clone()),
3866 )
3867 })
3868 .build()
3869 .await;
3870
3871 // These calls to the new client hit the on-disk cache.
3872 assert!(
3873 client
3874 .unstable_features()
3875 .await
3876 .unwrap()
3877 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3878 );
3879 let supported = client.supported_versions().await.unwrap();
3880 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3881 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3882
3883 // Then this call hits the in-memory cache.
3884 let supported = client.supported_versions().await.unwrap();
3885 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3886 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3887
3888 drop(versions_mock);
3889
3890 // Now, reset the cache, and observe the endpoints being called again once.
3891 client.reset_supported_versions().await.unwrap();
3892
3893 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3894
3895 // Hits network again.
3896 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3897 // Hits in-memory cache again.
3898 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3899 }
3900
3901 #[async_test]
3902 async fn test_well_known_caching() {
3903 let server = MatrixMockServer::new().await;
3904 let server_url = server.uri();
3905 let domain = server_url.strip_prefix("http://").unwrap();
3906 let server_name = <&ServerName>::try_from(domain).unwrap();
3907 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3908
3909 let well_known_mock = server
3910 .mock_well_known()
3911 .ok()
3912 .named("well known mock")
3913 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3914 .mount_as_scoped()
3915 .await;
3916
3917 let memory_store = Arc::new(MemoryStore::new());
3918 let client = Client::builder()
3919 .insecure_server_name_no_tls(server_name)
3920 .store_config(
3921 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3922 .state_store(memory_store.clone()),
3923 )
3924 .build()
3925 .await
3926 .unwrap();
3927
3928 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3929
3930 // This subsequent call hits the in-memory cache.
3931 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3932
3933 drop(client);
3934
3935 let client = server
3936 .client_builder()
3937 .no_server_versions()
3938 .on_builder(|builder| {
3939 builder.store_config(
3940 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3941 .state_store(memory_store.clone()),
3942 )
3943 })
3944 .build()
3945 .await;
3946
3947 // This call to the new client hits the on-disk cache.
3948 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3949
3950 // Then this call hits the in-memory cache.
3951 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3952
3953 drop(well_known_mock);
3954
3955 // Now, reset the cache, and observe the endpoints being called again once.
3956 client.reset_well_known().await.unwrap();
3957
3958 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3959
3960 // Hits network again.
3961 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3962 // Hits in-memory cache again.
3963 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3964 }
3965
3966 #[async_test]
3967 async fn test_missing_well_known_caching() {
3968 let server = MatrixMockServer::new().await;
3969 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3970
3971 let well_known_mock = server
3972 .mock_well_known()
3973 .error_unrecognized()
3974 .named("first well-known mock")
3975 .expect(1)
3976 .mount_as_scoped()
3977 .await;
3978
3979 let memory_store = Arc::new(MemoryStore::new());
3980 let client = server
3981 .client_builder()
3982 .on_builder(|builder| {
3983 builder.store_config(
3984 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3985 .state_store(memory_store.clone()),
3986 )
3987 })
3988 .build()
3989 .await;
3990
3991 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3992
3993 // This subsequent call hits the in-memory cache.
3994 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3995
3996 drop(client);
3997
3998 let client = server
3999 .client_builder()
4000 .on_builder(|builder| {
4001 builder.store_config(
4002 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4003 .state_store(memory_store.clone()),
4004 )
4005 })
4006 .build()
4007 .await;
4008
4009 // This call to the new client hits the on-disk cache.
4010 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4011
4012 // Then this call hits the in-memory cache.
4013 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4014
4015 drop(well_known_mock);
4016
4017 // Now, reset the cache, and observe the endpoints being called again once.
4018 client.reset_well_known().await.unwrap();
4019
4020 server
4021 .mock_well_known()
4022 .error_unrecognized()
4023 .expect(1)
4024 .named("second well-known mock")
4025 .mount()
4026 .await;
4027
4028 // Hits network again.
4029 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4030 // Hits in-memory cache again.
4031 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4032 }
4033
4034 #[async_test]
4035 async fn test_no_network_doesnt_cause_infinite_retries() {
4036 // We want infinite retries for transient errors.
4037 let client = MockClientBuilder::new(None)
4038 .on_builder(|builder| builder.request_config(RequestConfig::new()))
4039 .build()
4040 .await;
4041
4042 // We don't define a mock server on purpose here, so that the error is really a
4043 // network error.
4044 client.whoami().await.unwrap_err();
4045 }
4046
4047 #[async_test]
4048 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4049 let server = MatrixMockServer::new().await;
4050 let client = server.client_builder().build().await;
4051
4052 let room_id = room_id!("!room:example.org");
4053
4054 server
4055 .mock_sync()
4056 .ok_and_run(&client, |builder| {
4057 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4058 })
4059 .await;
4060
4061 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4062 assert_eq!(room.room_id(), room_id);
4063 }
4064
4065 #[async_test]
4066 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4067 let server = MatrixMockServer::new().await;
4068 let client = server.client_builder().build().await;
4069
4070 let room_id = room_id!("!room:example.org");
4071
4072 let client = Arc::new(client);
4073
4074 // Perform the /sync request with a delay so it starts after the
4075 // `await_room_remote_echo` call has happened
4076 spawn({
4077 let client = client.clone();
4078 async move {
4079 sleep(Duration::from_millis(100)).await;
4080
4081 server
4082 .mock_sync()
4083 .ok_and_run(&client, |builder| {
4084 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4085 })
4086 .await;
4087 }
4088 });
4089
4090 let room =
4091 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4092 assert_eq!(room.room_id(), room_id);
4093 }
4094
4095 #[async_test]
4096 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4097 let client = MockClientBuilder::new(None).build().await;
4098
4099 let room_id = room_id!("!room:example.org");
4100 // Room is not present so the client won't be able to find it. The call will
4101 // timeout.
4102 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4103 }
4104
4105 #[async_test]
4106 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4107 let server = MatrixMockServer::new().await;
4108 let client = server.client_builder().build().await;
4109
4110 server.mock_create_room().ok().mount().await;
4111
4112 // Create a room in the internal store
4113 let room = client
4114 .create_room(assign!(CreateRoomRequest::new(), {
4115 invite: vec![],
4116 is_direct: false,
4117 }))
4118 .await
4119 .unwrap();
4120
4121 // Room is locally present, but not synced, the call will timeout
4122 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4123 .await
4124 .unwrap_err();
4125 }
4126
4127 #[async_test]
4128 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4129 let server = MatrixMockServer::new().await;
4130 let client = server.client_builder().build().await;
4131
4132 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4133
4134 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4135 assert_matches!(ret, Ok(true));
4136 }
4137
4138 #[async_test]
4139 async fn test_is_room_alias_available_if_alias_is_resolved() {
4140 let server = MatrixMockServer::new().await;
4141 let client = server.client_builder().build().await;
4142
4143 server
4144 .mock_room_directory_resolve_alias()
4145 .ok("!some_room_id:matrix.org", Vec::new())
4146 .expect(1)
4147 .mount()
4148 .await;
4149
4150 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4151 assert_matches!(ret, Ok(false));
4152 }
4153
4154 #[async_test]
4155 async fn test_is_room_alias_available_if_error_found() {
4156 let server = MatrixMockServer::new().await;
4157 let client = server.client_builder().build().await;
4158
4159 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4160
4161 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4162 assert_matches!(ret, Err(_));
4163 }
4164
4165 #[async_test]
4166 async fn test_create_room_alias() {
4167 let server = MatrixMockServer::new().await;
4168 let client = server.client_builder().build().await;
4169
4170 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4171
4172 let ret = client
4173 .create_room_alias(
4174 room_alias_id!("#some_alias:matrix.org"),
4175 room_id!("!some_room:matrix.org"),
4176 )
4177 .await;
4178 assert_matches!(ret, Ok(()));
4179 }
4180
4181 #[async_test]
4182 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4183 let server = MatrixMockServer::new().await;
4184 let client = server.client_builder().build().await;
4185
4186 let room_id = room_id!("!a-room:matrix.org");
4187
4188 // Make sure the summary endpoint is called once
4189 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4190
4191 // We create a locally cached invited room
4192 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4193
4194 // And we get a preview, the server endpoint was reached
4195 let preview = client
4196 .get_room_preview(room_id.into(), Vec::new())
4197 .await
4198 .expect("Room preview should be retrieved");
4199
4200 assert_eq!(invited_room.room_id(), preview.room_id);
4201 }
4202
4203 #[async_test]
4204 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4205 let server = MatrixMockServer::new().await;
4206 let client = server.client_builder().build().await;
4207
4208 let room_id = room_id!("!a-room:matrix.org");
4209
4210 // Make sure the summary endpoint is called once
4211 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4212
4213 // We create a locally cached left room
4214 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4215
4216 // And we get a preview, the server endpoint was reached
4217 let preview = client
4218 .get_room_preview(room_id.into(), Vec::new())
4219 .await
4220 .expect("Room preview should be retrieved");
4221
4222 assert_eq!(left_room.room_id(), preview.room_id);
4223 }
4224
4225 #[async_test]
4226 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4227 let server = MatrixMockServer::new().await;
4228 let client = server.client_builder().build().await;
4229
4230 let room_id = room_id!("!a-room:matrix.org");
4231
4232 // Make sure the summary endpoint is called once
4233 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4234
4235 // We create a locally cached knocked room
4236 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4237
4238 // And we get a preview, the server endpoint was reached
4239 let preview = client
4240 .get_room_preview(room_id.into(), Vec::new())
4241 .await
4242 .expect("Room preview should be retrieved");
4243
4244 assert_eq!(knocked_room.room_id(), preview.room_id);
4245 }
4246
4247 #[async_test]
4248 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4249 let server = MatrixMockServer::new().await;
4250 let client = server.client_builder().build().await;
4251
4252 let room_id = room_id!("!a-room:matrix.org");
4253
4254 // Make sure the summary endpoint is not called
4255 server.mock_room_summary().ok(room_id).never().mount().await;
4256
4257 // We create a locally cached joined room
4258 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4259
4260 // And we get a preview, no server endpoint was reached
4261 let preview = client
4262 .get_room_preview(room_id.into(), Vec::new())
4263 .await
4264 .expect("Room preview should be retrieved");
4265
4266 assert_eq!(joined_room.room_id(), preview.room_id);
4267 }
4268
4269 #[async_test]
4270 async fn test_media_preview_config() {
4271 let server = MatrixMockServer::new().await;
4272 let client = server.client_builder().build().await;
4273
4274 server
4275 .mock_sync()
4276 .ok_and_run(&client, |builder| {
4277 builder.add_custom_global_account_data(json!({
4278 "content": {
4279 "media_previews": "private",
4280 "invite_avatars": "off"
4281 },
4282 "type": "m.media_preview_config"
4283 }));
4284 })
4285 .await;
4286
4287 let (initial_value, stream) =
4288 client.account().observe_media_preview_config().await.unwrap();
4289
4290 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4291 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4292 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4293 pin_mut!(stream);
4294 assert_pending!(stream);
4295
4296 server
4297 .mock_sync()
4298 .ok_and_run(&client, |builder| {
4299 builder.add_custom_global_account_data(json!({
4300 "content": {
4301 "media_previews": "off",
4302 "invite_avatars": "on"
4303 },
4304 "type": "m.media_preview_config"
4305 }));
4306 })
4307 .await;
4308
4309 assert_next_matches!(
4310 stream,
4311 MediaPreviewConfigEventContent {
4312 media_previews: Some(MediaPreviews::Off),
4313 invite_avatars: Some(InviteAvatars::On),
4314 ..
4315 }
4316 );
4317 assert_pending!(stream);
4318 }
4319
4320 #[async_test]
4321 async fn test_unstable_media_preview_config() {
4322 let server = MatrixMockServer::new().await;
4323 let client = server.client_builder().build().await;
4324
4325 server
4326 .mock_sync()
4327 .ok_and_run(&client, |builder| {
4328 builder.add_custom_global_account_data(json!({
4329 "content": {
4330 "media_previews": "private",
4331 "invite_avatars": "off"
4332 },
4333 "type": "io.element.msc4278.media_preview_config"
4334 }));
4335 })
4336 .await;
4337
4338 let (initial_value, stream) =
4339 client.account().observe_media_preview_config().await.unwrap();
4340
4341 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4342 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4343 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4344 pin_mut!(stream);
4345 assert_pending!(stream);
4346
4347 server
4348 .mock_sync()
4349 .ok_and_run(&client, |builder| {
4350 builder.add_custom_global_account_data(json!({
4351 "content": {
4352 "media_previews": "off",
4353 "invite_avatars": "on"
4354 },
4355 "type": "io.element.msc4278.media_preview_config"
4356 }));
4357 })
4358 .await;
4359
4360 assert_next_matches!(
4361 stream,
4362 MediaPreviewConfigEventContent {
4363 media_previews: Some(MediaPreviews::Off),
4364 invite_avatars: Some(InviteAvatars::On),
4365 ..
4366 }
4367 );
4368 assert_pending!(stream);
4369 }
4370
4371 #[async_test]
4372 async fn test_media_preview_config_not_found() {
4373 let server = MatrixMockServer::new().await;
4374 let client = server.client_builder().build().await;
4375
4376 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4377
4378 assert!(initial_value.is_none());
4379 }
4380
4381 #[async_test]
4382 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4383 // The default Matrix version we use is 1.11 or higher, so authenticated media
4384 // is supported.
4385 let server = MatrixMockServer::new().await;
4386 let client = server.client_builder().build().await;
4387
4388 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4389
4390 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4391 client.load_or_fetch_max_upload_size().await.unwrap();
4392
4393 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4394 }
4395
4396 #[async_test]
4397 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4398 // The server must advertise support for the stable feature for authenticated
4399 // media support, so we mock the `GET /versions` response.
4400 let server = MatrixMockServer::new().await;
4401 let client = server.client_builder().no_server_versions().build().await;
4402
4403 server
4404 .mock_versions()
4405 .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4406 .with_feature("org.matrix.msc3916.stable", true)
4407 .ok()
4408 .named("versions")
4409 .expect(1)
4410 .mount()
4411 .await;
4412
4413 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4414
4415 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4416 client.load_or_fetch_max_upload_size().await.unwrap();
4417
4418 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4419 }
4420
4421 #[async_test]
4422 async fn test_load_or_fetch_max_upload_size_no_auth() {
4423 // The server must not support Matrix 1.11 or higher for unauthenticated
4424 // media requests, so we mock the `GET /versions` response.
4425 let server = MatrixMockServer::new().await;
4426 let client = server.client_builder().no_server_versions().build().await;
4427
4428 server
4429 .mock_versions()
4430 .with_versions(vec!["v1.1"])
4431 .ok()
4432 .named("versions")
4433 .expect(1)
4434 .mount()
4435 .await;
4436
4437 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4438
4439 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4440 client.load_or_fetch_max_upload_size().await.unwrap();
4441
4442 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4443 }
4444
4445 #[async_test]
4446 async fn test_uploading_a_too_large_media_file() {
4447 let server = MatrixMockServer::new().await;
4448 let client = server.client_builder().build().await;
4449
4450 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4451 client.load_or_fetch_max_upload_size().await.unwrap();
4452 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4453
4454 let data = vec![1, 2];
4455 let upload_request =
4456 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4457 let request = SendRequest {
4458 client: client.clone(),
4459 request: upload_request,
4460 config: None,
4461 send_progress: SharedObservable::new(TransmissionProgress::default()),
4462 };
4463 let media_request = SendMediaUploadRequest::new(request);
4464
4465 let error = media_request.await.err();
4466 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4467 assert_eq!(max, uint!(1));
4468 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4469 }
4470
4471 #[async_test]
4472 async fn test_dont_ignore_timeout_on_first_sync() {
4473 let server = MatrixMockServer::new().await;
4474 let client = server.client_builder().build().await;
4475
4476 server
4477 .mock_sync()
4478 .timeout(Some(Duration::from_secs(30)))
4479 .ok(|_| {})
4480 .mock_once()
4481 .named("sync_with_timeout")
4482 .mount()
4483 .await;
4484
4485 // Call the endpoint once to check the timeout.
4486 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4487
4488 timeout(Duration::from_secs(1), async {
4489 stream.next().await.unwrap().unwrap();
4490 })
4491 .await
4492 .unwrap();
4493 }
4494
4495 #[async_test]
4496 async fn test_ignore_timeout_on_first_sync() {
4497 let server = MatrixMockServer::new().await;
4498 let client = server.client_builder().build().await;
4499
4500 server
4501 .mock_sync()
4502 .timeout(None)
4503 .ok(|_| {})
4504 .mock_once()
4505 .named("sync_no_timeout")
4506 .mount()
4507 .await;
4508 server
4509 .mock_sync()
4510 .timeout(Some(Duration::from_secs(30)))
4511 .ok(|_| {})
4512 .mock_once()
4513 .named("sync_with_timeout")
4514 .mount()
4515 .await;
4516
4517 // Call each version of the endpoint once to check the timeouts.
4518 let mut stream = Box::pin(
4519 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4520 );
4521
4522 timeout(Duration::from_secs(1), async {
4523 stream.next().await.unwrap().unwrap();
4524 stream.next().await.unwrap().unwrap();
4525 })
4526 .await
4527 .unwrap();
4528 }
4529
4530 #[async_test]
4531 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4532 let server = MatrixMockServer::new().await;
4533 let client = server.client_builder().build().await;
4534 // This is the user ID that is inside MemberAdditional.
4535 // Note the confusing username, so we can share
4536 // GlobalAccountDataTestEvent::Direct with the invited test.
4537 let user_id = user_id!("@invited:localhost");
4538
4539 // When we receive a sync response saying "invited" is invited to a DM
4540 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4541 let response = SyncResponseBuilder::default()
4542 .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4543 .add_global_account_data(
4544 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4545 )
4546 .build_sync_response();
4547 client.base_client().receive_sync_response(response).await.unwrap();
4548
4549 // Then get_dm_room finds this room
4550 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4551 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4552 }
4553
4554 #[async_test]
4555 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4556 let server = MatrixMockServer::new().await;
4557 let client = server.client_builder().build().await;
4558 // This is the user ID that is inside MemberInvite
4559 let user_id = user_id!("@invited:localhost");
4560
4561 // When we receive a sync response saying "invited" is invited to a DM
4562 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4563 let response = SyncResponseBuilder::default()
4564 .add_joined_room(
4565 JoinedRoomBuilder::default()
4566 .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4567 )
4568 .add_global_account_data(
4569 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4570 )
4571 .build_sync_response();
4572 client.base_client().receive_sync_response(response).await.unwrap();
4573
4574 // Then get_dm_room finds this room
4575 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4576 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4577 }
4578
4579 #[async_test]
4580 async fn test_get_dm_room_still_finds_left_room() {
4581 // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4582 // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4583
4584 let server = MatrixMockServer::new().await;
4585 let client = server.client_builder().build().await;
4586 // This is the user ID that is inside MemberAdditional.
4587 // Note the confusing username, so we can share
4588 // GlobalAccountDataTestEvent::Direct with the invited test.
4589 let user_id = user_id!("@invited:localhost");
4590
4591 // When we receive a sync response saying "invited" has left a DM
4592 let f = EventFactory::new().sender(user_id);
4593 let response = SyncResponseBuilder::default()
4594 .add_joined_room(
4595 JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4596 )
4597 .add_global_account_data(
4598 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4599 )
4600 .build_sync_response();
4601 client.base_client().receive_sync_response(response).await.unwrap();
4602
4603 // Then get_dm_room finds this room
4604 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4605 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4606 }
4607
4608 #[async_test]
4609 async fn test_device_exists() {
4610 let server = MatrixMockServer::new().await;
4611 let client = server.client_builder().build().await;
4612
4613 server.mock_get_device().ok().expect(1).mount().await;
4614
4615 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4616 }
4617
4618 #[async_test]
4619 async fn test_device_exists_404() {
4620 let server = MatrixMockServer::new().await;
4621 let client = server.client_builder().build().await;
4622
4623 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4624 }
4625
4626 #[async_test]
4627 async fn test_device_exists_500() {
4628 let server = MatrixMockServer::new().await;
4629 let client = server.client_builder().build().await;
4630
4631 server.mock_get_device().error500().expect(1).mount().await;
4632
4633 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4634 }
4635
4636 #[async_test]
4637 async fn test_fetching_well_known_with_homeserver_url() {
4638 let server = MatrixMockServer::new().await;
4639 let client = server.client_builder().build().await;
4640 server.mock_well_known().ok().mount().await;
4641
4642 assert_matches!(client.fetch_client_well_known().await, Some(_));
4643 }
4644
4645 #[async_test]
4646 async fn test_fetching_well_known_with_server_name() {
4647 let server = MatrixMockServer::new().await;
4648 let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4649
4650 server.mock_well_known().ok().mount().await;
4651
4652 let client = MockClientBuilder::new(None)
4653 .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4654 .build()
4655 .await;
4656
4657 assert_matches!(client.fetch_client_well_known().await, Some(_));
4658 }
4659
4660 #[async_test]
4661 async fn test_fetching_well_known_with_domain_part_of_user_id() {
4662 let server = MatrixMockServer::new().await;
4663 server.mock_well_known().ok().mount().await;
4664
4665 let user_id =
4666 UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4667 let client = MockClientBuilder::new(None)
4668 .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4669 .build()
4670 .await;
4671
4672 assert_matches!(client.fetch_client_well_known().await, Some(_));
4673 }
4674}