matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
32use matrix_sdk_base::{
33 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
34 StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
35 event_cache::store::EventCacheStoreLock,
36 media::store::MediaStoreLock,
37 store::{
38 DynStateStore, RoomLoadSettings, SupportedVersionsResponse, TtlStoreValue,
39 WellKnownResponse,
40 },
41 sync::{Notification, RoomUpdates},
42 task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::ttl_cache::TtlCache;
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50 api::{
51 FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52 auth_scheme::{AuthScheme, SendAccessToken},
53 client::{
54 account::whoami,
55 alias::{create_alias, delete_alias, get_alias},
56 authenticated_media,
57 device::{self, delete_devices, get_devices, update_device},
58 directory::{get_public_rooms, get_public_rooms_filtered},
59 discovery::{
60 discover_homeserver::{self, RtcFocusInfo},
61 get_capabilities::{self, v3::Capabilities},
62 get_supported_versions,
63 },
64 error::ErrorKind,
65 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
66 knock::knock_room,
67 media,
68 membership::{join_room_by_id, join_room_by_id_or_alias},
69 room::create_room,
70 session::login::v3::DiscoveryInfo,
71 sync::sync_events,
72 threads::get_thread_subscriptions_changes,
73 uiaa,
74 user_directory::search_users,
75 },
76 error::FromHttpResponseError,
77 path_builder::PathBuilder,
78 },
79 assign,
80 events::direct::DirectUserIdentifier,
81 push::Ruleset,
82 time::Instant,
83};
84use serde::de::DeserializeOwned;
85use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
86use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
87use url::Url;
88
89use self::{
90 caches::{CachedValue, ClientCaches},
91 futures::SendRequest,
92};
93use crate::{
94 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
95 Room, SessionTokens, TransmissionProgress,
96 authentication::{
97 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
98 oauth::OAuth,
99 },
100 client::thread_subscriptions::ThreadSubscriptionCatchup,
101 config::{RequestConfig, SyncToken},
102 deduplicating_handler::DeduplicatingHandler,
103 error::HttpResult,
104 event_cache::EventCache,
105 event_handler::{
106 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
107 EventHandlerStore, ObservableEventHandler, SyncEvent,
108 },
109 http_client::{HttpClient, SupportedPathBuilder},
110 latest_events::LatestEvents,
111 media::MediaError,
112 notification_settings::NotificationSettings,
113 room::RoomMember,
114 room_preview::RoomPreview,
115 send_queue::{SendQueue, SendQueueData},
116 sliding_sync::Version as SlidingSyncVersion,
117 sync::{RoomUpdate, SyncResponse},
118};
119#[cfg(feature = "e2e-encryption")]
120use crate::{
121 cross_process_lock::CrossProcessLock,
122 encryption::{
123 DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
124 VerificationState,
125 },
126};
127
128mod builder;
129pub(crate) mod caches;
130pub(crate) mod futures;
131pub(crate) mod thread_subscriptions;
132
133pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
134#[cfg(feature = "experimental-search")]
135use crate::search_index::SearchIndex;
136
137#[cfg(not(target_family = "wasm"))]
138type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
139#[cfg(target_family = "wasm")]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
141
142#[cfg(not(target_family = "wasm"))]
143type NotificationHandlerFn =
144 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
145#[cfg(target_family = "wasm")]
146type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
147
148/// Enum controlling if a loop running callbacks should continue or abort.
149///
150/// This is mainly used in the [`sync_with_callback`] method, the return value
151/// of the provided callback controls if the sync loop should be exited.
152///
153/// [`sync_with_callback`]: #method.sync_with_callback
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub enum LoopCtrl {
156 /// Continue running the loop.
157 Continue,
158 /// Break out of the loop.
159 Break,
160}
161
162/// Represents changes that can occur to a `Client`s `Session`.
163#[derive(Debug, Clone, PartialEq)]
164pub enum SessionChange {
165 /// The session's token is no longer valid.
166 UnknownToken {
167 /// Whether or not the session was soft logged out
168 soft_logout: bool,
169 },
170 /// The session's tokens have been refreshed.
171 TokensRefreshed,
172}
173
174/// Information about the server vendor obtained from the federation API.
175#[derive(Debug, Clone, PartialEq, Eq)]
176#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
177pub struct ServerVendorInfo {
178 /// The server name.
179 pub server_name: String,
180 /// The server version.
181 pub version: String,
182}
183
184/// An async/await enabled Matrix client.
185///
186/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
187#[derive(Clone)]
188pub struct Client {
189 pub(crate) inner: Arc<ClientInner>,
190}
191
192#[derive(Default)]
193pub(crate) struct ClientLocks {
194 /// Lock ensuring that only a single room may be marked as a DM at once.
195 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
196 /// explanation.
197 pub(crate) mark_as_dm_lock: Mutex<()>,
198
199 /// Lock ensuring that only a single secret store is getting opened at the
200 /// same time.
201 ///
202 /// This is important so we don't accidentally create multiple different new
203 /// default secret storage keys.
204 #[cfg(feature = "e2e-encryption")]
205 pub(crate) open_secret_store_lock: Mutex<()>,
206
207 /// Lock ensuring that we're only storing a single secret at a time.
208 ///
209 /// Take a look at the [`SecretStore::put_secret`] method for a more
210 /// detailed explanation.
211 ///
212 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
213 #[cfg(feature = "e2e-encryption")]
214 pub(crate) store_secret_lock: Mutex<()>,
215
216 /// Lock ensuring that only one method at a time might modify our backup.
217 #[cfg(feature = "e2e-encryption")]
218 pub(crate) backup_modify_lock: Mutex<()>,
219
220 /// Lock ensuring that we're going to attempt to upload backups for a single
221 /// requester.
222 #[cfg(feature = "e2e-encryption")]
223 pub(crate) backup_upload_lock: Mutex<()>,
224
225 /// Handler making sure we only have one group session sharing request in
226 /// flight per room.
227 #[cfg(feature = "e2e-encryption")]
228 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
229
230 /// Lock making sure we're only doing one key claim request at a time.
231 #[cfg(feature = "e2e-encryption")]
232 pub(crate) key_claim_lock: Mutex<()>,
233
234 /// Handler to ensure that only one members request is running at a time,
235 /// given a room.
236 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
237
238 /// Handler to ensure that only one encryption state request is running at a
239 /// time, given a room.
240 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
241
242 /// Deduplicating handler for sending read receipts. The string is an
243 /// internal implementation detail, see [`Self::send_single_receipt`].
244 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
245
246 #[cfg(feature = "e2e-encryption")]
247 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
248
249 /// Latest "generation" of data known by the crypto store.
250 ///
251 /// This is a counter that only increments, set in the database (and can
252 /// wrap). It's incremented whenever some process acquires a lock for the
253 /// first time. *This assumes the crypto store lock is being held, to
254 /// avoid data races on writing to this value in the store*.
255 ///
256 /// The current process will maintain this value in local memory and in the
257 /// DB over time. Observing a different value than the one read in
258 /// memory, when reading from the store indicates that somebody else has
259 /// written into the database under our feet.
260 ///
261 /// TODO: this should live in the `OlmMachine`, since it's information
262 /// related to the lock. As of today (2023-07-28), we blow up the entire
263 /// olm machine when there's a generation mismatch. So storing the
264 /// generation in the olm machine would make the client think there's
265 /// *always* a mismatch, and that's why we need to store the generation
266 /// outside the `OlmMachine`.
267 #[cfg(feature = "e2e-encryption")]
268 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
269}
270
271pub(crate) struct ClientInner {
272 /// All the data related to authentication and authorization.
273 pub(crate) auth_ctx: Arc<AuthCtx>,
274
275 /// The URL of the server.
276 ///
277 /// Not to be confused with the `Self::homeserver`. `server` is usually
278 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
279 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
280 /// homeserver (at the time of writing — 2024-08-28).
281 ///
282 /// This value is optional depending on how the `Client` has been built.
283 /// If it's been built from a homeserver URL directly, we don't know the
284 /// server. However, if the `Client` has been built from a server URL or
285 /// name, then the homeserver has been discovered, and we know both.
286 server: Option<Url>,
287
288 /// The URL of the homeserver to connect to.
289 ///
290 /// This is the URL for the client-server Matrix API.
291 homeserver: StdRwLock<Url>,
292
293 /// The sliding sync version.
294 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
295
296 /// The underlying HTTP client.
297 pub(crate) http_client: HttpClient,
298
299 /// User session data.
300 pub(super) base_client: BaseClient,
301
302 /// Collection of in-memory caches for the [`Client`].
303 pub(crate) caches: ClientCaches,
304
305 /// Collection of locks individual client methods might want to use, either
306 /// to ensure that only a single call to a method happens at once or to
307 /// deduplicate multiple calls to a method.
308 pub(crate) locks: ClientLocks,
309
310 /// The cross-process store locks holder name.
311 ///
312 /// The SDK provides cross-process store locks (see
313 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
314 /// `holder_name` is the value used for all cross-process store locks
315 /// used by this `Client`.
316 ///
317 /// If multiple `Client`s are running in different processes, this
318 /// value MUST be different for each `Client`.
319 cross_process_store_locks_holder_name: String,
320
321 /// A mapping of the times at which the current user sent typing notices,
322 /// keyed by room.
323 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
324
325 /// Event handlers. See `add_event_handler`.
326 pub(crate) event_handlers: EventHandlerStore,
327
328 /// Notification handlers. See `register_notification_handler`.
329 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
330
331 /// The sender-side of channels used to receive room updates.
332 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
333
334 /// The sender-side of a channel used to observe all the room updates of a
335 /// sync response.
336 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
337
338 /// Whether the client should update its homeserver URL with the discovery
339 /// information present in the login response.
340 respect_login_well_known: bool,
341
342 /// An event that can be listened on to wait for a successful sync. The
343 /// event will only be fired if a sync loop is running. Can be used for
344 /// synchronization, e.g. if we send out a request to create a room, we can
345 /// wait for the sync to get the data to fetch a room object from the state
346 /// store.
347 pub(crate) sync_beat: event_listener::Event,
348
349 /// A central cache for events, inactive first.
350 ///
351 /// It becomes active when [`EventCache::subscribe`] is called.
352 pub(crate) event_cache: OnceCell<EventCache>,
353
354 /// End-to-end encryption related state.
355 #[cfg(feature = "e2e-encryption")]
356 pub(crate) e2ee: EncryptionData,
357
358 /// The verification state of our own device.
359 #[cfg(feature = "e2e-encryption")]
360 pub(crate) verification_state: SharedObservable<VerificationState>,
361
362 /// Whether to enable the experimental support for sending and receiving
363 /// encrypted room history on invite, per [MSC4268].
364 ///
365 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
366 #[cfg(feature = "e2e-encryption")]
367 pub(crate) enable_share_history_on_invite: bool,
368
369 /// Data related to the [`SendQueue`].
370 ///
371 /// [`SendQueue`]: crate::send_queue::SendQueue
372 pub(crate) send_queue_data: Arc<SendQueueData>,
373
374 /// The `max_upload_size` value of the homeserver, it contains the max
375 /// request size you can send.
376 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
377
378 /// The entry point to get the [`LatestEvent`] of rooms and threads.
379 ///
380 /// [`LatestEvent`]: crate::latest_event::LatestEvent
381 latest_events: OnceCell<LatestEvents>,
382
383 /// Service handling the catching up of thread subscriptions in the
384 /// background.
385 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
386
387 #[cfg(feature = "experimental-search")]
388 /// Handler for [`RoomIndex`]'s of each room
389 search_index: SearchIndex,
390
391 /// A monitor for background tasks spawned by the client.
392 pub(crate) task_monitor: TaskMonitor,
393
394 /// A sender to notify subscribers about duplicate key upload errors
395 /// triggered by requests to /keys/upload.
396 #[cfg(feature = "e2e-encryption")]
397 pub(crate) duplicate_key_upload_error_sender:
398 broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
399}
400
401impl ClientInner {
402 /// Create a new `ClientInner`.
403 ///
404 /// All the fields passed as parameters here are those that must be cloned
405 /// upon instantiation of a sub-client, e.g. a client specialized for
406 /// notifications.
407 #[allow(clippy::too_many_arguments)]
408 async fn new(
409 auth_ctx: Arc<AuthCtx>,
410 server: Option<Url>,
411 homeserver: Url,
412 sliding_sync_version: SlidingSyncVersion,
413 http_client: HttpClient,
414 base_client: BaseClient,
415 supported_versions: CachedValue<SupportedVersions>,
416 well_known: CachedValue<Option<WellKnownResponse>>,
417 respect_login_well_known: bool,
418 event_cache: OnceCell<EventCache>,
419 send_queue: Arc<SendQueueData>,
420 latest_events: OnceCell<LatestEvents>,
421 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
422 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
423 cross_process_store_locks_holder_name: String,
424 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
425 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
426 ) -> Arc<Self> {
427 let caches = ClientCaches {
428 supported_versions: supported_versions.into(),
429 well_known: well_known.into(),
430 server_metadata: Mutex::new(TtlCache::new()),
431 };
432
433 let client = Self {
434 server,
435 homeserver: StdRwLock::new(homeserver),
436 auth_ctx,
437 sliding_sync_version: StdRwLock::new(sliding_sync_version),
438 http_client,
439 base_client,
440 caches,
441 locks: Default::default(),
442 cross_process_store_locks_holder_name,
443 typing_notice_times: Default::default(),
444 event_handlers: Default::default(),
445 notification_handlers: Default::default(),
446 room_update_channels: Default::default(),
447 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
448 // ballast for all observers to catch up.
449 room_updates_sender: broadcast::Sender::new(32),
450 respect_login_well_known,
451 sync_beat: event_listener::Event::new(),
452 event_cache,
453 send_queue_data: send_queue,
454 latest_events,
455 #[cfg(feature = "e2e-encryption")]
456 e2ee: EncryptionData::new(encryption_settings),
457 #[cfg(feature = "e2e-encryption")]
458 verification_state: SharedObservable::new(VerificationState::Unknown),
459 #[cfg(feature = "e2e-encryption")]
460 enable_share_history_on_invite,
461 server_max_upload_size: Mutex::new(OnceCell::new()),
462 #[cfg(feature = "experimental-search")]
463 search_index: search_index_handler,
464 thread_subscription_catchup,
465 task_monitor: TaskMonitor::new(),
466 #[cfg(feature = "e2e-encryption")]
467 duplicate_key_upload_error_sender: broadcast::channel(1).0,
468 };
469
470 #[allow(clippy::let_and_return)]
471 let client = Arc::new(client);
472
473 #[cfg(feature = "e2e-encryption")]
474 client.e2ee.initialize_tasks(&client);
475
476 let _ = client
477 .event_cache
478 .get_or_init(|| async {
479 EventCache::new(&client, client.base_client.event_cache_store().clone())
480 })
481 .await;
482
483 let _ = client
484 .thread_subscription_catchup
485 .get_or_init(|| async {
486 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
487 })
488 .await;
489
490 client
491 }
492}
493
494#[cfg(not(tarpaulin_include))]
495impl Debug for Client {
496 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
497 write!(fmt, "Client")
498 }
499}
500
501impl Client {
502 /// Create a new [`Client`] that will use the given homeserver.
503 ///
504 /// # Arguments
505 ///
506 /// * `homeserver_url` - The homeserver that the client should connect to.
507 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
508 Self::builder().homeserver_url(homeserver_url).build().await
509 }
510
511 /// Returns a subscriber that publishes an event every time the ignore user
512 /// list changes.
513 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
514 self.inner.base_client.subscribe_to_ignore_user_list_changes()
515 }
516
517 /// Create a new [`ClientBuilder`].
518 pub fn builder() -> ClientBuilder {
519 ClientBuilder::new()
520 }
521
522 pub(crate) fn base_client(&self) -> &BaseClient {
523 &self.inner.base_client
524 }
525
526 /// The underlying HTTP client.
527 pub fn http_client(&self) -> &reqwest::Client {
528 &self.inner.http_client.inner
529 }
530
531 pub(crate) fn locks(&self) -> &ClientLocks {
532 &self.inner.locks
533 }
534
535 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
536 &self.inner.auth_ctx
537 }
538
539 /// The cross-process store locks holder name.
540 ///
541 /// The SDK provides cross-process store locks (see
542 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
543 /// `holder_name` is the value used for all cross-process store locks
544 /// used by this `Client`.
545 pub fn cross_process_store_locks_holder_name(&self) -> &str {
546 &self.inner.cross_process_store_locks_holder_name
547 }
548
549 /// Change the homeserver URL used by this client.
550 ///
551 /// # Arguments
552 ///
553 /// * `homeserver_url` - The new URL to use.
554 fn set_homeserver(&self, homeserver_url: Url) {
555 *self.inner.homeserver.write().unwrap() = homeserver_url;
556 }
557
558 /// Change to a different homeserver and re-resolve well-known.
559 #[cfg(feature = "e2e-encryption")]
560 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
561 &self,
562 homeserver_url: Url,
563 ) -> Result<()> {
564 self.set_homeserver(homeserver_url);
565 self.reset_well_known().await?;
566 if let Some(well_known) = self.load_or_fetch_well_known().await? {
567 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
568 }
569 Ok(())
570 }
571
572 /// Get the capabilities of the homeserver.
573 ///
574 /// This method should be used to check what features are supported by the
575 /// homeserver.
576 ///
577 /// # Examples
578 ///
579 /// ```no_run
580 /// # use matrix_sdk::Client;
581 /// # use url::Url;
582 /// # async {
583 /// # let homeserver = Url::parse("http://example.com")?;
584 /// let client = Client::new(homeserver).await?;
585 ///
586 /// let capabilities = client.get_capabilities().await?;
587 ///
588 /// if capabilities.change_password.enabled {
589 /// // Change password
590 /// }
591 /// # anyhow::Ok(()) };
592 /// ```
593 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
594 let res = self.send(get_capabilities::v3::Request::new()).await?;
595 Ok(res.capabilities)
596 }
597
598 /// Get the server vendor information from the federation API.
599 ///
600 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
601 /// both the server's software name and version.
602 ///
603 /// # Examples
604 ///
605 /// ```no_run
606 /// # use matrix_sdk::Client;
607 /// # use url::Url;
608 /// # async {
609 /// # let homeserver = Url::parse("http://example.com")?;
610 /// let client = Client::new(homeserver).await?;
611 ///
612 /// let server_info = client.server_vendor_info(None).await?;
613 /// println!(
614 /// "Server: {}, Version: {}",
615 /// server_info.server_name, server_info.version
616 /// );
617 /// # anyhow::Ok(()) };
618 /// ```
619 #[cfg(feature = "federation-api")]
620 pub async fn server_vendor_info(
621 &self,
622 request_config: Option<RequestConfig>,
623 ) -> HttpResult<ServerVendorInfo> {
624 use ruma::api::federation::discovery::get_server_version;
625
626 let res = self
627 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
628 .await?;
629
630 // Extract server info, using defaults if fields are missing.
631 let server = res.server.unwrap_or_default();
632 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
633 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
634
635 Ok(ServerVendorInfo { server_name: server_name_str, version })
636 }
637
638 /// Get a copy of the default request config.
639 ///
640 /// The default request config is what's used when sending requests if no
641 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
642 /// function with such a parameter.
643 ///
644 /// If the default request config was not customized through
645 /// [`ClientBuilder`] when creating this `Client`, the returned value will
646 /// be equivalent to [`RequestConfig::default()`].
647 pub fn request_config(&self) -> RequestConfig {
648 self.inner.http_client.request_config
649 }
650
651 /// Check whether the client has been activated.
652 ///
653 /// A client is considered active when:
654 ///
655 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
656 /// is logged in,
657 /// 2. Has loaded cached data from storage,
658 /// 3. If encryption is enabled, it also initialized or restored its
659 /// `OlmMachine`.
660 pub fn is_active(&self) -> bool {
661 self.inner.base_client.is_active()
662 }
663
664 /// The server used by the client.
665 ///
666 /// See `Self::server` to learn more.
667 pub fn server(&self) -> Option<&Url> {
668 self.inner.server.as_ref()
669 }
670
671 /// The homeserver of the client.
672 pub fn homeserver(&self) -> Url {
673 self.inner.homeserver.read().unwrap().clone()
674 }
675
676 /// Get the sliding sync version.
677 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
678 self.inner.sliding_sync_version.read().unwrap().clone()
679 }
680
681 /// Override the sliding sync version.
682 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
683 let mut lock = self.inner.sliding_sync_version.write().unwrap();
684 *lock = version;
685 }
686
687 /// Get the Matrix user session meta information.
688 ///
689 /// If the client is currently logged in, this will return a
690 /// [`SessionMeta`] object which contains the user ID and device ID.
691 /// Otherwise it returns `None`.
692 pub fn session_meta(&self) -> Option<&SessionMeta> {
693 self.base_client().session_meta()
694 }
695
696 /// Returns a receiver that gets events for each room info update. To watch
697 /// for new events, use `receiver.resubscribe()`.
698 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
699 self.base_client().room_info_notable_update_receiver()
700 }
701
702 /// Performs a search for users.
703 /// The search is performed case-insensitively on user IDs and display names
704 ///
705 /// # Arguments
706 ///
707 /// * `search_term` - The search term for the search
708 /// * `limit` - The maximum number of results to return. Defaults to 10.
709 ///
710 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
711 pub async fn search_users(
712 &self,
713 search_term: &str,
714 limit: u64,
715 ) -> HttpResult<search_users::v3::Response> {
716 let mut request = search_users::v3::Request::new(search_term.to_owned());
717
718 if let Some(limit) = UInt::new(limit) {
719 request.limit = limit;
720 }
721
722 self.send(request).await
723 }
724
725 /// Get the user id of the current owner of the client.
726 pub fn user_id(&self) -> Option<&UserId> {
727 self.session_meta().map(|s| s.user_id.as_ref())
728 }
729
730 /// Get the device ID that identifies the current session.
731 pub fn device_id(&self) -> Option<&DeviceId> {
732 self.session_meta().map(|s| s.device_id.as_ref())
733 }
734
735 /// Get the current access token for this session.
736 ///
737 /// Will be `None` if the client has not been logged in.
738 pub fn access_token(&self) -> Option<String> {
739 self.auth_ctx().access_token()
740 }
741
742 /// Get the current tokens for this session.
743 ///
744 /// To be notified of changes in the session tokens, use
745 /// [`Client::subscribe_to_session_changes()`] or
746 /// [`Client::set_session_callbacks()`].
747 ///
748 /// Returns `None` if the client has not been logged in.
749 pub fn session_tokens(&self) -> Option<SessionTokens> {
750 self.auth_ctx().session_tokens()
751 }
752
753 /// Access the authentication API used to log in this client.
754 ///
755 /// Will be `None` if the client has not been logged in.
756 pub fn auth_api(&self) -> Option<AuthApi> {
757 match self.auth_ctx().auth_data.get()? {
758 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
759 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
760 }
761 }
762
763 /// Get the whole session info of this client.
764 ///
765 /// Will be `None` if the client has not been logged in.
766 ///
767 /// Can be used with [`Client::restore_session`] to restore a previously
768 /// logged-in session.
769 pub fn session(&self) -> Option<AuthSession> {
770 match self.auth_api()? {
771 AuthApi::Matrix(api) => api.session().map(Into::into),
772 AuthApi::OAuth(api) => api.full_session().map(Into::into),
773 }
774 }
775
776 /// Get a reference to the state store.
777 pub fn state_store(&self) -> &DynStateStore {
778 self.base_client().state_store()
779 }
780
781 /// Get a reference to the event cache store.
782 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
783 self.base_client().event_cache_store()
784 }
785
786 /// Get a reference to the media store.
787 pub fn media_store(&self) -> &MediaStoreLock {
788 self.base_client().media_store()
789 }
790
791 /// Access the native Matrix authentication API with this client.
792 pub fn matrix_auth(&self) -> MatrixAuth {
793 MatrixAuth::new(self.clone())
794 }
795
796 /// Get the account of the current owner of the client.
797 pub fn account(&self) -> Account {
798 Account::new(self.clone())
799 }
800
801 /// Get the encryption manager of the client.
802 #[cfg(feature = "e2e-encryption")]
803 pub fn encryption(&self) -> Encryption {
804 Encryption::new(self.clone())
805 }
806
807 /// Get the media manager of the client.
808 pub fn media(&self) -> Media {
809 Media::new(self.clone())
810 }
811
812 /// Get the pusher manager of the client.
813 pub fn pusher(&self) -> Pusher {
814 Pusher::new(self.clone())
815 }
816
817 /// Access the OAuth 2.0 API of the client.
818 pub fn oauth(&self) -> OAuth {
819 OAuth::new(self.clone())
820 }
821
822 /// Register a handler for a specific event type.
823 ///
824 /// The handler is a function or closure with one or more arguments. The
825 /// first argument is the event itself. All additional arguments are
826 /// "context" arguments: They have to implement [`EventHandlerContext`].
827 /// This trait is named that way because most of the types implementing it
828 /// give additional context about an event: The room it was in, its raw form
829 /// and other similar things. As two exceptions to this,
830 /// [`Client`] and [`EventHandlerHandle`] also implement the
831 /// `EventHandlerContext` trait so you don't have to clone your client
832 /// into the event handler manually and a handler can decide to remove
833 /// itself.
834 ///
835 /// Some context arguments are not universally applicable. A context
836 /// argument that isn't available for the given event type will result in
837 /// the event handler being skipped and an error being logged. The following
838 /// context argument types are only available for a subset of event types:
839 ///
840 /// * [`Room`] is only available for room-specific events, i.e. not for
841 /// events like global account data events or presence events.
842 ///
843 /// You can provide custom context via
844 /// [`add_event_handler_context`](Client::add_event_handler_context) and
845 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
846 /// into the event handler.
847 ///
848 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
849 ///
850 /// # Examples
851 ///
852 /// ```no_run
853 /// use matrix_sdk::{
854 /// deserialized_responses::EncryptionInfo,
855 /// event_handler::Ctx,
856 /// ruma::{
857 /// events::{
858 /// macros::EventContent,
859 /// push_rules::PushRulesEvent,
860 /// room::{
861 /// message::SyncRoomMessageEvent,
862 /// topic::SyncRoomTopicEvent,
863 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
864 /// },
865 /// },
866 /// push::Action,
867 /// Int, MilliSecondsSinceUnixEpoch,
868 /// },
869 /// Client, Room,
870 /// };
871 /// use serde::{Deserialize, Serialize};
872 ///
873 /// # async fn example(client: Client) {
874 /// client.add_event_handler(
875 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
876 /// // Common usage: Room event plus room and client.
877 /// },
878 /// );
879 /// client.add_event_handler(
880 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
881 /// async move {
882 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
883 /// // unencrypted events and events that were decrypted by the SDK.
884 /// }
885 /// },
886 /// );
887 /// client.add_event_handler(
888 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
889 /// async move {
890 /// // A `Vec<Action>` parameter allows you to know which push actions
891 /// // are applicable for an event. For example, an event with
892 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
893 /// // in the timeline.
894 /// }
895 /// },
896 /// );
897 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
898 /// // You can omit any or all arguments after the first.
899 /// });
900 ///
901 /// // Registering a temporary event handler:
902 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
903 /// /* Event handler */
904 /// });
905 /// client.remove_event_handler(handle);
906 ///
907 /// // Registering custom event handler context:
908 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
909 /// struct MyContext {
910 /// number: usize,
911 /// }
912 /// client.add_event_handler_context(MyContext { number: 5 });
913 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
914 /// // Use the context
915 /// });
916 ///
917 /// // This will handle membership events in joined rooms. Invites are special, see below.
918 /// client.add_event_handler(
919 /// |ev: SyncRoomMemberEvent| async move {},
920 /// );
921 ///
922 /// // To handle state events in invited rooms (including invite membership events),
923 /// // `StrippedRoomMemberEvent` should be used.
924 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
925 /// client.add_event_handler(
926 /// |ev: StrippedRoomMemberEvent| async move {},
927 /// );
928 ///
929 /// // Custom events work exactly the same way, you just need to declare
930 /// // the content struct and use the EventContent derive macro on it.
931 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
932 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
933 /// struct TokenEventContent {
934 /// token: String,
935 /// #[serde(rename = "exp")]
936 /// expires_at: MilliSecondsSinceUnixEpoch,
937 /// }
938 ///
939 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
940 /// todo!("Display the token");
941 /// });
942 ///
943 /// // Event handler closures can also capture local variables.
944 /// // Make sure they are cheap to clone though, because they will be cloned
945 /// // every time the closure is called.
946 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
947 ///
948 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
949 /// println!("Calling the handler with identifier {data}");
950 /// });
951 /// # }
952 /// ```
953 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
954 where
955 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
956 H: EventHandler<Ev, Ctx>,
957 {
958 self.add_event_handler_impl(handler, None)
959 }
960
961 /// Register a handler for a specific room, and event type.
962 ///
963 /// This method works the same way as
964 /// [`add_event_handler`][Self::add_event_handler], except that the handler
965 /// will only be called for events in the room with the specified ID. See
966 /// that method for more details on event handler functions.
967 ///
968 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
969 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
970 /// your use case.
971 pub fn add_room_event_handler<Ev, Ctx, H>(
972 &self,
973 room_id: &RoomId,
974 handler: H,
975 ) -> EventHandlerHandle
976 where
977 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
978 H: EventHandler<Ev, Ctx>,
979 {
980 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
981 }
982
983 /// Observe a specific event type.
984 ///
985 /// `Ev` represents the kind of event that will be observed. `Ctx`
986 /// represents the context that will come with the event. It relies on the
987 /// same mechanism as [`Client::add_event_handler`]. The main difference is
988 /// that it returns an [`ObservableEventHandler`] and doesn't require a
989 /// user-defined closure. It is possible to subscribe to the
990 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
991 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
992 /// Ctx)`.
993 ///
994 /// Be careful that only the most recent value can be observed. Subscribers
995 /// are notified when a new value is sent, but there is no guarantee
996 /// that they will see all values.
997 ///
998 /// # Example
999 ///
1000 /// Let's see a classical usage:
1001 ///
1002 /// ```
1003 /// use futures_util::StreamExt as _;
1004 /// use matrix_sdk::{
1005 /// Client, Room,
1006 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
1007 /// };
1008 ///
1009 /// # async fn example(client: Client) -> Option<()> {
1010 /// let observer =
1011 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1012 ///
1013 /// let mut subscriber = observer.subscribe();
1014 ///
1015 /// let (event, (room, push_actions)) = subscriber.next().await?;
1016 /// # Some(())
1017 /// # }
1018 /// ```
1019 ///
1020 /// Now let's see how to get several contexts that can be useful for you:
1021 ///
1022 /// ```
1023 /// use matrix_sdk::{
1024 /// Client, Room,
1025 /// deserialized_responses::EncryptionInfo,
1026 /// ruma::{
1027 /// events::room::{
1028 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1029 /// },
1030 /// push::Action,
1031 /// },
1032 /// };
1033 ///
1034 /// # async fn example(client: Client) {
1035 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1036 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1037 ///
1038 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1039 /// // to distinguish between unencrypted events and events that were decrypted
1040 /// // by the SDK.
1041 /// let _ = client
1042 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1043 /// );
1044 ///
1045 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1046 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1047 /// // should be highlighted in the timeline.
1048 /// let _ =
1049 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1050 ///
1051 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1052 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1053 /// # }
1054 /// ```
1055 ///
1056 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1057 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1058 where
1059 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1060 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1061 {
1062 self.observe_room_events_impl(None)
1063 }
1064
1065 /// Observe a specific room, and event type.
1066 ///
1067 /// This method works the same way as [`Client::observe_events`], except
1068 /// that the observability will only be applied for events in the room with
1069 /// the specified ID. See that method for more details.
1070 ///
1071 /// Be careful that only the most recent value can be observed. Subscribers
1072 /// are notified when a new value is sent, but there is no guarantee
1073 /// that they will see all values.
1074 pub fn observe_room_events<Ev, Ctx>(
1075 &self,
1076 room_id: &RoomId,
1077 ) -> ObservableEventHandler<(Ev, Ctx)>
1078 where
1079 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1080 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1081 {
1082 self.observe_room_events_impl(Some(room_id.to_owned()))
1083 }
1084
1085 /// Shared implementation for `Client::observe_events` and
1086 /// `Client::observe_room_events`.
1087 fn observe_room_events_impl<Ev, Ctx>(
1088 &self,
1089 room_id: Option<OwnedRoomId>,
1090 ) -> ObservableEventHandler<(Ev, Ctx)>
1091 where
1092 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1093 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1094 {
1095 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1096 // new value.
1097 let shared_observable = SharedObservable::new(None);
1098
1099 ObservableEventHandler::new(
1100 shared_observable.clone(),
1101 self.event_handler_drop_guard(self.add_event_handler_impl(
1102 move |event: Ev, context: Ctx| {
1103 shared_observable.set(Some((event, context)));
1104
1105 ready(())
1106 },
1107 room_id,
1108 )),
1109 )
1110 }
1111
1112 /// Remove the event handler associated with the handle.
1113 ///
1114 /// Note that you **must not** call `remove_event_handler` from the
1115 /// non-async part of an event handler, that is:
1116 ///
1117 /// ```ignore
1118 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1119 /// // ⚠ this will cause a deadlock ⚠
1120 /// client.remove_event_handler(handle);
1121 ///
1122 /// async move {
1123 /// // removing the event handler here is fine
1124 /// client.remove_event_handler(handle);
1125 /// }
1126 /// })
1127 /// ```
1128 ///
1129 /// Note also that handlers that remove themselves will still execute with
1130 /// events received in the same sync cycle.
1131 ///
1132 /// # Arguments
1133 ///
1134 /// `handle` - The [`EventHandlerHandle`] that is returned when
1135 /// registering the event handler with [`Client::add_event_handler`].
1136 ///
1137 /// # Examples
1138 ///
1139 /// ```no_run
1140 /// # use url::Url;
1141 /// # use tokio::sync::mpsc;
1142 /// #
1143 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1144 /// #
1145 /// use matrix_sdk::{
1146 /// Client, event_handler::EventHandlerHandle,
1147 /// ruma::events::room::member::SyncRoomMemberEvent,
1148 /// };
1149 /// #
1150 /// # futures_executor::block_on(async {
1151 /// # let client = matrix_sdk::Client::builder()
1152 /// # .homeserver_url(homeserver)
1153 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1154 /// # .build()
1155 /// # .await
1156 /// # .unwrap();
1157 ///
1158 /// client.add_event_handler(
1159 /// |ev: SyncRoomMemberEvent,
1160 /// client: Client,
1161 /// handle: EventHandlerHandle| async move {
1162 /// // Common usage: Check arriving Event is the expected one
1163 /// println!("Expected RoomMemberEvent received!");
1164 /// client.remove_event_handler(handle);
1165 /// },
1166 /// );
1167 /// # });
1168 /// ```
1169 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1170 self.inner.event_handlers.remove(handle);
1171 }
1172
1173 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1174 /// the given handle.
1175 ///
1176 /// When the returned value is dropped, the event handler will be removed.
1177 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1178 EventHandlerDropGuard::new(handle, self.clone())
1179 }
1180
1181 /// Add an arbitrary value for use as event handler context.
1182 ///
1183 /// The value can be obtained in an event handler by adding an argument of
1184 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1185 ///
1186 /// If a value of the same type has been added before, it will be
1187 /// overwritten.
1188 ///
1189 /// # Examples
1190 ///
1191 /// ```no_run
1192 /// use matrix_sdk::{
1193 /// Room, event_handler::Ctx,
1194 /// ruma::events::room::message::SyncRoomMessageEvent,
1195 /// };
1196 /// # #[derive(Clone)]
1197 /// # struct SomeType;
1198 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1199 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1200 /// # futures_executor::block_on(async {
1201 /// # let client = matrix_sdk::Client::builder()
1202 /// # .homeserver_url(homeserver)
1203 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1204 /// # .build()
1205 /// # .await
1206 /// # .unwrap();
1207 ///
1208 /// // Handle used to send messages to the UI part of the app
1209 /// let my_gui_handle: SomeType = obtain_gui_handle();
1210 ///
1211 /// client.add_event_handler_context(my_gui_handle.clone());
1212 /// client.add_event_handler(
1213 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1214 /// async move {
1215 /// // gui_handle.send(DisplayMessage { message: ev });
1216 /// }
1217 /// },
1218 /// );
1219 /// # });
1220 /// ```
1221 pub fn add_event_handler_context<T>(&self, ctx: T)
1222 where
1223 T: Clone + Send + Sync + 'static,
1224 {
1225 self.inner.event_handlers.add_context(ctx);
1226 }
1227
1228 /// Register a handler for a notification.
1229 ///
1230 /// Similar to [`Client::add_event_handler`], but only allows functions
1231 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1232 /// [`Client`] for now.
1233 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1234 where
1235 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1236 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1237 {
1238 self.inner.notification_handlers.write().await.push(Box::new(
1239 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1240 ));
1241
1242 self
1243 }
1244
1245 /// Subscribe to all updates for the room with the given ID.
1246 ///
1247 /// The returned receiver will receive a new message for each sync response
1248 /// that contains updates for that room.
1249 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1250 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1251 btree_map::Entry::Vacant(entry) => {
1252 let (tx, rx) = broadcast::channel(8);
1253 entry.insert(tx);
1254 rx
1255 }
1256 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1257 }
1258 }
1259
1260 /// Subscribe to all updates to all rooms, whenever any has been received in
1261 /// a sync response.
1262 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1263 self.inner.room_updates_sender.subscribe()
1264 }
1265
1266 pub(crate) async fn notification_handlers(
1267 &self,
1268 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1269 self.inner.notification_handlers.read().await
1270 }
1271
1272 /// Get all the rooms the client knows about.
1273 ///
1274 /// This will return the list of joined, invited, and left rooms.
1275 pub fn rooms(&self) -> Vec<Room> {
1276 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1277 }
1278
1279 /// Get all the rooms the client knows about, filtered by room state.
1280 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1281 self.base_client()
1282 .rooms_filtered(filter)
1283 .into_iter()
1284 .map(|room| Room::new(self.clone(), room))
1285 .collect()
1286 }
1287
1288 /// Get a stream of all the rooms, in addition to the existing rooms.
1289 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1290 let (rooms, stream) = self.base_client().rooms_stream();
1291
1292 let map_room = |room| Room::new(self.clone(), room);
1293
1294 (
1295 rooms.into_iter().map(map_room).collect(),
1296 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1297 )
1298 }
1299
1300 /// Returns the joined rooms this client knows about.
1301 pub fn joined_rooms(&self) -> Vec<Room> {
1302 self.rooms_filtered(RoomStateFilter::JOINED)
1303 }
1304
1305 /// Returns the invited rooms this client knows about.
1306 pub fn invited_rooms(&self) -> Vec<Room> {
1307 self.rooms_filtered(RoomStateFilter::INVITED)
1308 }
1309
1310 /// Returns the left rooms this client knows about.
1311 pub fn left_rooms(&self) -> Vec<Room> {
1312 self.rooms_filtered(RoomStateFilter::LEFT)
1313 }
1314
1315 /// Returns the joined space rooms this client knows about.
1316 pub fn joined_space_rooms(&self) -> Vec<Room> {
1317 self.base_client()
1318 .rooms_filtered(RoomStateFilter::JOINED)
1319 .into_iter()
1320 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1321 .collect()
1322 }
1323
1324 /// Get a room with the given room id.
1325 ///
1326 /// # Arguments
1327 ///
1328 /// `room_id` - The unique id of the room that should be fetched.
1329 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1330 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1331 }
1332
1333 /// Gets the preview of a room, whether the current user has joined it or
1334 /// not.
1335 pub async fn get_room_preview(
1336 &self,
1337 room_or_alias_id: &RoomOrAliasId,
1338 via: Vec<OwnedServerName>,
1339 ) -> Result<RoomPreview> {
1340 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1341 Ok(room_id) => room_id.to_owned(),
1342 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1343 };
1344
1345 if let Some(room) = self.get_room(&room_id) {
1346 // The cached data can only be trusted if the room state is joined or
1347 // banned: for invite and knock rooms, no updates will be received
1348 // for the rooms after the invite/knock action took place so we may
1349 // have very out to date data for important fields such as
1350 // `join_rule`. For left rooms, the homeserver should return the latest info.
1351 match room.state() {
1352 RoomState::Joined | RoomState::Banned => {
1353 return Ok(RoomPreview::from_known_room(&room).await);
1354 }
1355 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1356 }
1357 }
1358
1359 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1360 }
1361
1362 /// Resolve a room alias to a room id and a list of servers which know
1363 /// about it.
1364 ///
1365 /// # Arguments
1366 ///
1367 /// `room_alias` - The room alias to be resolved.
1368 pub async fn resolve_room_alias(
1369 &self,
1370 room_alias: &RoomAliasId,
1371 ) -> HttpResult<get_alias::v3::Response> {
1372 let request = get_alias::v3::Request::new(room_alias.to_owned());
1373 self.send(request).await
1374 }
1375
1376 /// Checks if a room alias is not in use yet.
1377 ///
1378 /// Returns:
1379 /// - `Ok(true)` if the room alias is available.
1380 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1381 /// status code).
1382 /// - An `Err` otherwise.
1383 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1384 match self.resolve_room_alias(alias).await {
1385 // The room alias was resolved, so it's already in use.
1386 Ok(_) => Ok(false),
1387 Err(error) => {
1388 match error.client_api_error_kind() {
1389 // The room alias wasn't found, so it's available.
1390 Some(ErrorKind::NotFound) => Ok(true),
1391 _ => Err(error),
1392 }
1393 }
1394 }
1395 }
1396
1397 /// Adds a new room alias associated with a room to the room directory.
1398 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1399 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1400 self.send(request).await?;
1401 Ok(())
1402 }
1403
1404 /// Removes a room alias from the room directory.
1405 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1406 let request = delete_alias::v3::Request::new(alias.to_owned());
1407 self.send(request).await?;
1408 Ok(())
1409 }
1410
1411 /// Update the homeserver from the login response well-known if needed.
1412 ///
1413 /// # Arguments
1414 ///
1415 /// * `login_well_known` - The `well_known` field from a successful login
1416 /// response.
1417 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1418 if self.inner.respect_login_well_known
1419 && let Some(well_known) = login_well_known
1420 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1421 {
1422 self.set_homeserver(homeserver);
1423 }
1424 }
1425
1426 /// Similar to [`Client::restore_session_with`], with
1427 /// [`RoomLoadSettings::default()`].
1428 ///
1429 /// # Panics
1430 ///
1431 /// Panics if a session was already restored or logged in.
1432 #[instrument(skip_all)]
1433 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1434 self.restore_session_with(session, RoomLoadSettings::default()).await
1435 }
1436
1437 /// Restore a session previously logged-in using one of the available
1438 /// authentication APIs. The number of rooms to restore is controlled by
1439 /// [`RoomLoadSettings`].
1440 ///
1441 /// See the documentation of the corresponding authentication API's
1442 /// `restore_session` method for more information.
1443 ///
1444 /// # Panics
1445 ///
1446 /// Panics if a session was already restored or logged in.
1447 #[instrument(skip_all)]
1448 pub async fn restore_session_with(
1449 &self,
1450 session: impl Into<AuthSession>,
1451 room_load_settings: RoomLoadSettings,
1452 ) -> Result<()> {
1453 let session = session.into();
1454 match session {
1455 AuthSession::Matrix(session) => {
1456 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1457 }
1458 AuthSession::OAuth(session) => {
1459 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1460 }
1461 }
1462 }
1463
1464 /// Refresh the access token using the authentication API used to log into
1465 /// this session.
1466 ///
1467 /// See the documentation of the authentication API's `refresh_access_token`
1468 /// method for more information.
1469 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1470 let Some(auth_api) = self.auth_api() else {
1471 return Err(RefreshTokenError::RefreshTokenRequired);
1472 };
1473
1474 match auth_api {
1475 AuthApi::Matrix(api) => {
1476 trace!("Token refresh: Using the homeserver.");
1477 Box::pin(api.refresh_access_token()).await?;
1478 }
1479 AuthApi::OAuth(api) => {
1480 trace!("Token refresh: Using OAuth 2.0.");
1481 Box::pin(api.refresh_access_token()).await?;
1482 }
1483 }
1484
1485 Ok(())
1486 }
1487
1488 /// Log out the current session using the proper authentication API.
1489 ///
1490 /// # Errors
1491 ///
1492 /// Returns an error if the session is not authenticated or if an error
1493 /// occurred while making the request to the server.
1494 pub async fn logout(&self) -> Result<(), Error> {
1495 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1496 match auth_api {
1497 AuthApi::Matrix(matrix_auth) => {
1498 matrix_auth.logout().await?;
1499 Ok(())
1500 }
1501 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1502 }
1503 }
1504
1505 /// Get or upload a sync filter.
1506 ///
1507 /// This method will either get a filter ID from the store or upload the
1508 /// filter definition to the homeserver and return the new filter ID.
1509 ///
1510 /// # Arguments
1511 ///
1512 /// * `filter_name` - The unique name of the filter, this name will be used
1513 /// locally to store and identify the filter ID returned by the server.
1514 ///
1515 /// * `definition` - The filter definition that should be uploaded to the
1516 /// server if no filter ID can be found in the store.
1517 ///
1518 /// # Examples
1519 ///
1520 /// ```no_run
1521 /// # use matrix_sdk::{
1522 /// # Client, config::SyncSettings,
1523 /// # ruma::api::client::{
1524 /// # filter::{
1525 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1526 /// # },
1527 /// # sync::sync_events::v3::Filter,
1528 /// # }
1529 /// # };
1530 /// # use url::Url;
1531 /// # async {
1532 /// # let homeserver = Url::parse("http://example.com").unwrap();
1533 /// # let client = Client::new(homeserver).await.unwrap();
1534 /// let mut filter = FilterDefinition::default();
1535 ///
1536 /// // Let's enable member lazy loading.
1537 /// filter.room.state.lazy_load_options =
1538 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1539 ///
1540 /// let filter_id = client
1541 /// .get_or_upload_filter("sync", filter)
1542 /// .await
1543 /// .unwrap();
1544 ///
1545 /// let sync_settings = SyncSettings::new()
1546 /// .filter(Filter::FilterId(filter_id));
1547 ///
1548 /// let response = client.sync_once(sync_settings).await.unwrap();
1549 /// # };
1550 #[instrument(skip(self, definition))]
1551 pub async fn get_or_upload_filter(
1552 &self,
1553 filter_name: &str,
1554 definition: FilterDefinition,
1555 ) -> Result<String> {
1556 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1557 debug!("Found filter locally");
1558 Ok(filter)
1559 } else {
1560 debug!("Didn't find filter locally");
1561 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1562 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1563 let response = self.send(request).await?;
1564
1565 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1566
1567 Ok(response.filter_id)
1568 }
1569 }
1570
1571 /// Prepare to join a room by ID, by getting the current details about it
1572 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1573 let room = self.get_room(room_id)?;
1574
1575 let inviter = match room.invite_details().await {
1576 Ok(details) => details.inviter,
1577 Err(Error::WrongRoomState(_)) => None,
1578 Err(e) => {
1579 warn!("Error fetching invite details for room: {e:?}");
1580 None
1581 }
1582 };
1583
1584 Some(PreJoinRoomInfo { inviter })
1585 }
1586
1587 /// Finish joining a room.
1588 ///
1589 /// If the room was an invite that should be marked as a DM, will include it
1590 /// in the DM event after creating the joined room.
1591 ///
1592 /// If encrypted history sharing is enabled, will check to see if we have a
1593 /// key bundle, and import it if so.
1594 ///
1595 /// # Arguments
1596 ///
1597 /// * `room_id` - The `RoomId` of the room that was joined.
1598 /// * `pre_join_room_info` - Information about the room before we joined.
1599 async fn finish_join_room(
1600 &self,
1601 room_id: &RoomId,
1602 pre_join_room_info: Option<PreJoinRoomInfo>,
1603 ) -> Result<Room> {
1604 info!(?room_id, ?pre_join_room_info, "Completing room join");
1605 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1606 room.state() == RoomState::Invited
1607 && room.is_direct().await.unwrap_or_else(|e| {
1608 warn!(%room_id, "is_direct() failed: {e}");
1609 false
1610 })
1611 } else {
1612 false
1613 };
1614
1615 let base_room = self
1616 .base_client()
1617 .room_joined(
1618 room_id,
1619 pre_join_room_info
1620 .as_ref()
1621 .and_then(|info| info.inviter.as_ref())
1622 .map(|i| i.user_id().to_owned()),
1623 )
1624 .await?;
1625 let room = Room::new(self.clone(), base_room);
1626
1627 if mark_as_dm {
1628 room.set_is_direct(true).await?;
1629 }
1630
1631 #[cfg(feature = "e2e-encryption")]
1632 if self.inner.enable_share_history_on_invite
1633 && let Some(inviter) =
1634 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1635 {
1636 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1637 .await?;
1638 }
1639
1640 // Suppress "unused variable" and "unused field" lints
1641 #[cfg(not(feature = "e2e-encryption"))]
1642 let _ = pre_join_room_info.map(|i| i.inviter);
1643
1644 Ok(room)
1645 }
1646
1647 /// Join a room by `RoomId`.
1648 ///
1649 /// Returns the `Room` in the joined state.
1650 ///
1651 /// # Arguments
1652 ///
1653 /// * `room_id` - The `RoomId` of the room to be joined.
1654 #[instrument(skip(self))]
1655 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1656 // See who invited us to this room, if anyone. Note we have to do this before
1657 // making the `/join` request, otherwise we could race against the sync.
1658 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1659
1660 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1661 let response = self.send(request).await?;
1662 self.finish_join_room(&response.room_id, pre_join_info).await
1663 }
1664
1665 /// Join a room by `RoomOrAliasId`.
1666 ///
1667 /// Returns the `Room` in the joined state.
1668 ///
1669 /// # Arguments
1670 ///
1671 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1672 /// alias looks like `#name:example.com`.
1673 /// * `server_names` - The server names to be used for resolving the alias,
1674 /// if needs be.
1675 #[instrument(skip(self))]
1676 pub async fn join_room_by_id_or_alias(
1677 &self,
1678 alias: &RoomOrAliasId,
1679 server_names: &[OwnedServerName],
1680 ) -> Result<Room> {
1681 let pre_join_info = {
1682 match alias.try_into() {
1683 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1684 Err(_) => {
1685 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1686 // responding to an invitation to the room, and therefore don't need to handle
1687 // things that happen as a result of invites.
1688 None
1689 }
1690 }
1691 };
1692 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1693 via: server_names.to_owned(),
1694 });
1695 let response = self.send(request).await?;
1696 self.finish_join_room(&response.room_id, pre_join_info).await
1697 }
1698
1699 /// Search the homeserver's directory of public rooms.
1700 ///
1701 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1702 /// a `get_public_rooms::Response`.
1703 ///
1704 /// # Arguments
1705 ///
1706 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1707 ///
1708 /// * `since` - Pagination token from a previous request.
1709 ///
1710 /// * `server` - The name of the server, if `None` the requested server is
1711 /// used.
1712 ///
1713 /// # Examples
1714 /// ```no_run
1715 /// use matrix_sdk::Client;
1716 /// # use url::Url;
1717 /// # let homeserver = Url::parse("http://example.com").unwrap();
1718 /// # let limit = Some(10);
1719 /// # let since = Some("since token");
1720 /// # let server = Some("servername.com".try_into().unwrap());
1721 /// # async {
1722 /// let mut client = Client::new(homeserver).await.unwrap();
1723 ///
1724 /// client.public_rooms(limit, since, server).await;
1725 /// # };
1726 /// ```
1727 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1728 pub async fn public_rooms(
1729 &self,
1730 limit: Option<u32>,
1731 since: Option<&str>,
1732 server: Option<&ServerName>,
1733 ) -> HttpResult<get_public_rooms::v3::Response> {
1734 let limit = limit.map(UInt::from);
1735
1736 let request = assign!(get_public_rooms::v3::Request::new(), {
1737 limit,
1738 since: since.map(ToOwned::to_owned),
1739 server: server.map(ToOwned::to_owned),
1740 });
1741 self.send(request).await
1742 }
1743
1744 /// Create a room with the given parameters.
1745 ///
1746 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1747 /// created room.
1748 ///
1749 /// If you want to create a direct message with one specific user, you can
1750 /// use [`create_dm`][Self::create_dm], which is more convenient than
1751 /// assembling the [`create_room::v3::Request`] yourself.
1752 ///
1753 /// If the `is_direct` field of the request is set to `true` and at least
1754 /// one user is invited, the room will be automatically added to the direct
1755 /// rooms in the account data.
1756 ///
1757 /// # Examples
1758 ///
1759 /// ```no_run
1760 /// use matrix_sdk::{
1761 /// Client,
1762 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1763 /// };
1764 /// # use url::Url;
1765 /// #
1766 /// # async {
1767 /// # let homeserver = Url::parse("http://example.com").unwrap();
1768 /// let request = CreateRoomRequest::new();
1769 /// let client = Client::new(homeserver).await.unwrap();
1770 /// assert!(client.create_room(request).await.is_ok());
1771 /// # };
1772 /// ```
1773 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1774 let invite = request.invite.clone();
1775 let is_direct_room = request.is_direct;
1776 let response = self.send(request).await?;
1777
1778 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1779
1780 let joined_room = Room::new(self.clone(), base_room);
1781
1782 if is_direct_room
1783 && !invite.is_empty()
1784 && let Err(error) =
1785 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1786 {
1787 // FIXME: Retry in the background
1788 error!("Failed to mark room as DM: {error}");
1789 }
1790
1791 Ok(joined_room)
1792 }
1793
1794 /// Create a DM room.
1795 ///
1796 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1797 /// given user being invited, the room marked `is_direct` and both the
1798 /// creator and invitee getting the default maximum power level.
1799 ///
1800 /// If the `e2e-encryption` feature is enabled, the room will also be
1801 /// encrypted.
1802 ///
1803 /// # Arguments
1804 ///
1805 /// * `user_id` - The ID of the user to create a DM for.
1806 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1807 #[cfg(feature = "e2e-encryption")]
1808 let initial_state = vec![
1809 InitialStateEvent::with_empty_state_key(
1810 RoomEncryptionEventContent::with_recommended_defaults(),
1811 )
1812 .to_raw_any(),
1813 ];
1814
1815 #[cfg(not(feature = "e2e-encryption"))]
1816 let initial_state = vec![];
1817
1818 let request = assign!(create_room::v3::Request::new(), {
1819 invite: vec![user_id.to_owned()],
1820 is_direct: true,
1821 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1822 initial_state,
1823 });
1824
1825 self.create_room(request).await
1826 }
1827
1828 /// Get the existing DM room with the given user, if any.
1829 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1830 let rooms = self.joined_rooms();
1831
1832 // Find the room we share with the `user_id` and only with `user_id`
1833 let room = rooms.into_iter().find(|r| {
1834 let targets = r.direct_targets();
1835 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1836 });
1837
1838 trace!(?user_id, ?room, "Found DM room with user");
1839 room
1840 }
1841
1842 /// Search the homeserver's directory for public rooms with a filter.
1843 ///
1844 /// # Arguments
1845 ///
1846 /// * `room_search` - The easiest way to create this request is using the
1847 /// `get_public_rooms_filtered::Request` itself.
1848 ///
1849 /// # Examples
1850 ///
1851 /// ```no_run
1852 /// # use url::Url;
1853 /// # use matrix_sdk::Client;
1854 /// # async {
1855 /// # let homeserver = Url::parse("http://example.com")?;
1856 /// use matrix_sdk::ruma::{
1857 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1858 /// };
1859 /// # let mut client = Client::new(homeserver).await?;
1860 ///
1861 /// let mut filter = Filter::new();
1862 /// filter.generic_search_term = Some("rust".to_owned());
1863 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1864 /// request.filter = filter;
1865 ///
1866 /// let response = client.public_rooms_filtered(request).await?;
1867 ///
1868 /// for room in response.chunk {
1869 /// println!("Found room {room:?}");
1870 /// }
1871 /// # anyhow::Ok(()) };
1872 /// ```
1873 pub async fn public_rooms_filtered(
1874 &self,
1875 request: get_public_rooms_filtered::v3::Request,
1876 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1877 self.send(request).await
1878 }
1879
1880 /// Send an arbitrary request to the server, without updating client state.
1881 ///
1882 /// **Warning:** Because this method *does not* update the client state, it
1883 /// is important to make sure that you account for this yourself, and
1884 /// use wrapper methods where available. This method should *only* be
1885 /// used if a wrapper method for the endpoint you'd like to use is not
1886 /// available.
1887 ///
1888 /// # Arguments
1889 ///
1890 /// * `request` - A filled out and valid request for the endpoint to be hit
1891 ///
1892 /// * `timeout` - An optional request timeout setting, this overrides the
1893 /// default request setting if one was set.
1894 ///
1895 /// # Examples
1896 ///
1897 /// ```no_run
1898 /// # use matrix_sdk::{Client, config::SyncSettings};
1899 /// # use url::Url;
1900 /// # async {
1901 /// # let homeserver = Url::parse("http://localhost:8080")?;
1902 /// # let mut client = Client::new(homeserver).await?;
1903 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1904 ///
1905 /// // First construct the request you want to make
1906 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1907 /// // for all available Endpoints
1908 /// let user_id = user_id!("@example:localhost").to_owned();
1909 /// let request = profile::get_profile::v3::Request::new(user_id);
1910 ///
1911 /// // Start the request using Client::send()
1912 /// let response = client.send(request).await?;
1913 ///
1914 /// // Check the corresponding Response struct to find out what types are
1915 /// // returned
1916 /// # anyhow::Ok(()) };
1917 /// ```
1918 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1919 where
1920 Request: OutgoingRequest + Clone + Debug,
1921 for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1922 Request::PathBuilder: SupportedPathBuilder,
1923 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1924 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1925 {
1926 SendRequest {
1927 client: self.clone(),
1928 request,
1929 config: None,
1930 send_progress: Default::default(),
1931 }
1932 }
1933
1934 pub(crate) async fn send_inner<Request>(
1935 &self,
1936 request: Request,
1937 config: Option<RequestConfig>,
1938 send_progress: SharedObservable<TransmissionProgress>,
1939 ) -> HttpResult<Request::IncomingResponse>
1940 where
1941 Request: OutgoingRequest + Debug,
1942 for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1943 Request::PathBuilder: SupportedPathBuilder,
1944 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1945 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1946 {
1947 let homeserver = self.homeserver().to_string();
1948 let access_token = self.access_token();
1949 let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1950
1951 let path_builder_input =
1952 Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1953
1954 let result = self
1955 .inner
1956 .http_client
1957 .send(
1958 request,
1959 config,
1960 homeserver,
1961 access_token.as_deref(),
1962 path_builder_input,
1963 send_progress,
1964 )
1965 .await;
1966
1967 if let Err(Some(ErrorKind::UnknownToken { .. })) =
1968 result.as_ref().map_err(HttpError::client_api_error_kind)
1969 && let Some(access_token) = &access_token
1970 {
1971 // Mark the access token as expired.
1972 self.auth_ctx().set_access_token_expired(access_token);
1973 }
1974
1975 result
1976 }
1977
1978 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1979 _ = self
1980 .inner
1981 .auth_ctx
1982 .session_change_sender
1983 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1984 }
1985
1986 /// Fetches server versions from network; no caching.
1987 pub async fn fetch_server_versions(
1988 &self,
1989 request_config: Option<RequestConfig>,
1990 ) -> HttpResult<get_supported_versions::Response> {
1991 // Since this was called by the user, try to refresh the access token if
1992 // necessary.
1993 self.fetch_server_versions_inner(false, request_config).await
1994 }
1995
1996 /// Fetches server versions from network; no caching.
1997 ///
1998 /// If the access token is expired and `failsafe` is `false`, this will
1999 /// attempt to refresh the access token, otherwise this will try to make an
2000 /// unauthenticated request instead.
2001 pub(crate) async fn fetch_server_versions_inner(
2002 &self,
2003 failsafe: bool,
2004 request_config: Option<RequestConfig>,
2005 ) -> HttpResult<get_supported_versions::Response> {
2006 if !failsafe {
2007 // `Client::send()` handles refreshing access tokens.
2008 return self
2009 .send(get_supported_versions::Request::new())
2010 .with_request_config(request_config)
2011 .await;
2012 }
2013
2014 let homeserver = self.homeserver().to_string();
2015
2016 // If we have a fresh access token, try with it first.
2017 if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2018 && self.auth_ctx().has_valid_access_token()
2019 && let Some(access_token) = self.access_token()
2020 {
2021 let result = self
2022 .inner
2023 .http_client
2024 .send(
2025 get_supported_versions::Request::new(),
2026 request_config,
2027 homeserver.clone(),
2028 Some(&access_token),
2029 (),
2030 Default::default(),
2031 )
2032 .await;
2033
2034 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2035 result.as_ref().map_err(HttpError::client_api_error_kind)
2036 {
2037 // If the access token is actually expired, mark it as expired and fallback to
2038 // the unauthenticated request below.
2039 self.auth_ctx().set_access_token_expired(&access_token);
2040 } else {
2041 // If the request succeeded or it's an other error, just stop now.
2042 return result;
2043 }
2044 }
2045
2046 // Try without authentication.
2047 self.inner
2048 .http_client
2049 .send(
2050 get_supported_versions::Request::new(),
2051 request_config,
2052 homeserver.clone(),
2053 None,
2054 (),
2055 Default::default(),
2056 )
2057 .await
2058 }
2059
2060 /// Fetches client well_known from network; no caching.
2061 ///
2062 /// 1. If the [`Client::server`] value is available, we use it to fetch the
2063 /// well-known contents.
2064 /// 2. If it's not, we try extracting the server name from the
2065 /// [`Client::user_id`] and building the server URL from it.
2066 /// 3. If we couldn't get the well-known contents with either the explicit
2067 /// server name or the implicit extracted one, we try the homeserver URL
2068 /// as a last resort.
2069 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2070 let homeserver = self.homeserver();
2071 let scheme = homeserver.scheme();
2072
2073 // Use the server name, either an explicit one or an implicit one taken from
2074 // the user id: sometimes we'll have only the homeserver url available and no
2075 // server name, but the server name can be extracted from the current user id.
2076 let server_url = self
2077 .server()
2078 .map(|server| server.to_string())
2079 // If the server name wasn't available, extract it from the user id and build a URL:
2080 // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2081 // will be the same for the public server url, lacking a better candidate.
2082 .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2083
2084 // If the server name is available, first try using it
2085 let response = if let Some(server_url) = server_url {
2086 // First try using the server name
2087 self.fetch_client_well_known_with_url(server_url).await
2088 } else {
2089 None
2090 };
2091
2092 // If we didn't get a well-known value yet, try with the homeserver url instead:
2093 if response.is_none() {
2094 // Sometimes people configure their well-known directly on the homeserver so use
2095 // this as a fallback when the server name is unknown.
2096 warn!(
2097 "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2098 );
2099 self.fetch_client_well_known_with_url(homeserver.to_string()).await
2100 } else {
2101 response
2102 }
2103 }
2104
2105 async fn fetch_client_well_known_with_url(
2106 &self,
2107 url: String,
2108 ) -> Option<discover_homeserver::Response> {
2109 let well_known = self
2110 .inner
2111 .http_client
2112 .send(
2113 discover_homeserver::Request::new(),
2114 Some(RequestConfig::short_retry()),
2115 url,
2116 None,
2117 (),
2118 Default::default(),
2119 )
2120 .await;
2121
2122 match well_known {
2123 Ok(well_known) => Some(well_known),
2124 Err(http_error) => {
2125 // It is perfectly valid to not have a well-known file.
2126 // Maybe we should check for a specific error code to be sure?
2127 warn!("Failed to fetch client well-known: {http_error}");
2128 None
2129 }
2130 }
2131 }
2132
2133 /// Load supported versions from storage, or fetch them from network and
2134 /// cache them.
2135 ///
2136 /// If `failsafe` is true, this will try to minimize side effects to avoid
2137 /// possible deadlocks.
2138 async fn load_or_fetch_supported_versions(
2139 &self,
2140 failsafe: bool,
2141 ) -> HttpResult<SupportedVersionsResponse> {
2142 match self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await {
2143 Ok(Some(stored)) => {
2144 if let Some(supported_versions) =
2145 stored.into_supported_versions().and_then(|value| value.into_data())
2146 {
2147 return Ok(supported_versions);
2148 }
2149 }
2150 Ok(None) => {
2151 // fallthrough: cache is empty
2152 }
2153 Err(err) => {
2154 warn!("error when loading cached supported versions: {err}");
2155 // fallthrough to network.
2156 }
2157 }
2158
2159 let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2160 let supported_versions = SupportedVersionsResponse {
2161 versions: server_versions.versions,
2162 unstable_features: server_versions.unstable_features,
2163 };
2164
2165 // Only attempt to cache the result in storage if the request was authenticated.
2166 if self.auth_ctx().has_valid_access_token()
2167 && let Err(err) = self
2168 .state_store()
2169 .set_kv_data(
2170 StateStoreDataKey::SupportedVersions,
2171 StateStoreDataValue::SupportedVersions(TtlStoreValue::new(
2172 supported_versions.clone(),
2173 )),
2174 )
2175 .await
2176 {
2177 warn!("error when caching supported versions: {err}");
2178 }
2179
2180 Ok(supported_versions)
2181 }
2182
2183 pub(crate) async fn get_cached_supported_versions(&self) -> Option<SupportedVersions> {
2184 let supported_versions = &self.inner.caches.supported_versions;
2185
2186 if let CachedValue::Cached(val) = &*supported_versions.read().await {
2187 Some(val.clone())
2188 } else {
2189 None
2190 }
2191 }
2192
2193 /// Get the Matrix versions and features supported by the homeserver by
2194 /// fetching them from the server or the cache.
2195 ///
2196 /// This is equivalent to calling both [`Client::server_versions()`] and
2197 /// [`Client::unstable_features()`]. To always fetch the result from the
2198 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2199 /// and then `.as_supported_versions()` on the response.
2200 ///
2201 /// # Examples
2202 ///
2203 /// ```no_run
2204 /// use ruma::api::{FeatureFlag, MatrixVersion};
2205 /// # use matrix_sdk::{Client, config::SyncSettings};
2206 /// # use url::Url;
2207 /// # async {
2208 /// # let homeserver = Url::parse("http://localhost:8080")?;
2209 /// # let mut client = Client::new(homeserver).await?;
2210 ///
2211 /// let supported = client.supported_versions().await?;
2212 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2213 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2214 ///
2215 /// let msc_x_feature = FeatureFlag::from("msc_x");
2216 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2217 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2218 /// # anyhow::Ok(()) };
2219 /// ```
2220 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2221 self.supported_versions_inner(false).await
2222 }
2223
2224 /// Get the Matrix versions and features supported by the homeserver by
2225 /// fetching them from the server or the cache.
2226 ///
2227 /// If `failsafe` is true, this will try to minimize side effects to avoid
2228 /// possible deadlocks.
2229 pub(crate) async fn supported_versions_inner(
2230 &self,
2231 failsafe: bool,
2232 ) -> HttpResult<SupportedVersions> {
2233 let cached_supported_versions = &self.inner.caches.supported_versions;
2234 if let CachedValue::Cached(val) = &*cached_supported_versions.read().await {
2235 return Ok(val.clone());
2236 }
2237
2238 let mut guarded_supported_versions = cached_supported_versions.write().await;
2239 if let CachedValue::Cached(val) = &*guarded_supported_versions {
2240 return Ok(val.clone());
2241 }
2242
2243 let supported = self.load_or_fetch_supported_versions(failsafe).await?;
2244
2245 // Fill both unstable features and server versions at once.
2246 let mut supported_versions = supported.supported_versions();
2247 if supported_versions.versions.is_empty() {
2248 supported_versions.versions = [MatrixVersion::V1_0].into();
2249 }
2250
2251 // Only cache the result if the request was authenticated.
2252 if self.auth_ctx().has_valid_access_token() {
2253 *guarded_supported_versions = CachedValue::Cached(supported_versions.clone());
2254 }
2255
2256 Ok(supported_versions)
2257 }
2258
2259 /// Get the Matrix versions and features supported by the homeserver by
2260 /// fetching them from the cache.
2261 ///
2262 /// For a version of this function that fetches the supported versions and
2263 /// features from the homeserver if the [`SupportedVersions`] aren't
2264 /// found in the cache, take a look at the [`Client::server_versions()`]
2265 /// method.
2266 ///
2267 /// # Examples
2268 ///
2269 /// ```no_run
2270 /// use ruma::api::{FeatureFlag, MatrixVersion};
2271 /// # use matrix_sdk::{Client, config::SyncSettings};
2272 /// # use url::Url;
2273 /// # async {
2274 /// # let homeserver = Url::parse("http://localhost:8080")?;
2275 /// # let mut client = Client::new(homeserver).await?;
2276 ///
2277 /// let supported =
2278 /// if let Some(supported) = client.supported_versions_cached().await? {
2279 /// supported
2280 /// } else {
2281 /// client.fetch_server_versions(None).await?.as_supported_versions()
2282 /// };
2283 ///
2284 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2285 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2286 ///
2287 /// let msc_x_feature = FeatureFlag::from("msc_x");
2288 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2289 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2290 /// # anyhow::Ok(()) };
2291 /// ```
2292 pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2293 if let Some(cached) = self.get_cached_supported_versions().await {
2294 Ok(Some(cached))
2295 } else if let Some(stored) =
2296 self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await?
2297 {
2298 if let Some(supported) =
2299 stored.into_supported_versions().and_then(|value| value.into_data())
2300 {
2301 Ok(Some(supported.supported_versions()))
2302 } else {
2303 Ok(None)
2304 }
2305 } else {
2306 Ok(None)
2307 }
2308 }
2309
2310 /// Get the Matrix versions supported by the homeserver by fetching them
2311 /// from the server or the cache.
2312 ///
2313 /// # Examples
2314 ///
2315 /// ```no_run
2316 /// use ruma::api::MatrixVersion;
2317 /// # use matrix_sdk::{Client, config::SyncSettings};
2318 /// # use url::Url;
2319 /// # async {
2320 /// # let homeserver = Url::parse("http://localhost:8080")?;
2321 /// # let mut client = Client::new(homeserver).await?;
2322 ///
2323 /// let server_versions = client.server_versions().await?;
2324 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2325 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2326 /// # anyhow::Ok(()) };
2327 /// ```
2328 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2329 Ok(self.supported_versions().await?.versions)
2330 }
2331
2332 /// Get the unstable features supported by the homeserver by fetching them
2333 /// from the server or the cache.
2334 ///
2335 /// # Examples
2336 ///
2337 /// ```no_run
2338 /// use matrix_sdk::ruma::api::FeatureFlag;
2339 /// # use matrix_sdk::{Client, config::SyncSettings};
2340 /// # use url::Url;
2341 /// # async {
2342 /// # let homeserver = Url::parse("http://localhost:8080")?;
2343 /// # let mut client = Client::new(homeserver).await?;
2344 ///
2345 /// let msc_x_feature = FeatureFlag::from("msc_x");
2346 /// let unstable_features = client.unstable_features().await?;
2347 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2348 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2349 /// # anyhow::Ok(()) };
2350 /// ```
2351 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2352 Ok(self.supported_versions().await?.features)
2353 }
2354
2355 /// Empty the supported versions and unstable features cache.
2356 ///
2357 /// Since the SDK caches the supported versions, it's possible to have a
2358 /// stale entry in the cache. This functions makes it possible to force
2359 /// reset it.
2360 pub async fn reset_supported_versions(&self) -> Result<()> {
2361 // Empty the in-memory caches.
2362 self.inner.caches.supported_versions.write().await.take();
2363
2364 // Empty the store cache.
2365 Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2366 }
2367
2368 /// Load well-known from storage, or fetch it from network and cache it.
2369 async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2370 match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
2371 Ok(Some(stored)) => {
2372 if let Some(well_known) =
2373 stored.into_well_known().and_then(|value| value.into_data())
2374 {
2375 return Ok(well_known);
2376 }
2377 }
2378 Ok(None) => {
2379 // fallthrough: cache is empty
2380 }
2381 Err(err) => {
2382 warn!("error when loading cached well-known: {err}");
2383 // fallthrough to network.
2384 }
2385 }
2386
2387 let well_known = self.fetch_client_well_known().await.map(Into::into);
2388
2389 // Attempt to cache the result in storage.
2390 if let Err(err) = self
2391 .state_store()
2392 .set_kv_data(
2393 StateStoreDataKey::WellKnown,
2394 StateStoreDataValue::WellKnown(TtlStoreValue::new(well_known.clone())),
2395 )
2396 .await
2397 {
2398 warn!("error when caching well-known: {err}");
2399 }
2400
2401 Ok(well_known)
2402 }
2403
2404 async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2405 let well_known = &self.inner.caches.well_known;
2406 if let CachedValue::Cached(val) = &*well_known.read().await {
2407 return Ok(val.clone());
2408 }
2409
2410 let mut guarded_well_known = well_known.write().await;
2411 if let CachedValue::Cached(val) = &*guarded_well_known {
2412 return Ok(val.clone());
2413 }
2414
2415 let well_known = self.load_or_fetch_well_known().await?;
2416
2417 *guarded_well_known = CachedValue::Cached(well_known.clone());
2418
2419 Ok(well_known)
2420 }
2421
2422 /// Get information about the homeserver's advertised RTC foci by fetching
2423 /// the well-known file from the server or the cache.
2424 ///
2425 /// # Examples
2426 /// ```no_run
2427 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2428 /// # use url::Url;
2429 /// # async {
2430 /// # let homeserver = Url::parse("http://localhost:8080")?;
2431 /// # let mut client = Client::new(homeserver).await?;
2432 /// let rtc_foci = client.rtc_foci().await?;
2433 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2434 /// RtcFocusInfo::LiveKit(info) => Some(info),
2435 /// _ => None,
2436 /// });
2437 /// if let Some(info) = default_livekit_focus_info {
2438 /// println!("Default LiveKit service URL: {}", info.service_url);
2439 /// }
2440 /// # anyhow::Ok(()) };
2441 /// ```
2442 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2443 let well_known = self.get_or_load_and_cache_well_known().await?;
2444
2445 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2446 }
2447
2448 /// Empty the well-known cache.
2449 ///
2450 /// Since the SDK caches the well-known, it's possible to have a stale entry
2451 /// in the cache. This functions makes it possible to force reset it.
2452 pub async fn reset_well_known(&self) -> Result<()> {
2453 // Empty the in-memory caches.
2454 self.inner.caches.well_known.write().await.take();
2455
2456 // Empty the store cache.
2457 Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2458 }
2459
2460 /// Check whether MSC 4028 is enabled on the homeserver.
2461 ///
2462 /// # Examples
2463 ///
2464 /// ```no_run
2465 /// # use matrix_sdk::{Client, config::SyncSettings};
2466 /// # use url::Url;
2467 /// # async {
2468 /// # let homeserver = Url::parse("http://localhost:8080")?;
2469 /// # let mut client = Client::new(homeserver).await?;
2470 /// let msc4028_enabled =
2471 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2472 /// # anyhow::Ok(()) };
2473 /// ```
2474 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2475 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2476 }
2477
2478 /// Get information of all our own devices.
2479 ///
2480 /// # Examples
2481 ///
2482 /// ```no_run
2483 /// # use matrix_sdk::{Client, config::SyncSettings};
2484 /// # use url::Url;
2485 /// # async {
2486 /// # let homeserver = Url::parse("http://localhost:8080")?;
2487 /// # let mut client = Client::new(homeserver).await?;
2488 /// let response = client.devices().await?;
2489 ///
2490 /// for device in response.devices {
2491 /// println!(
2492 /// "Device: {} {}",
2493 /// device.device_id,
2494 /// device.display_name.as_deref().unwrap_or("")
2495 /// );
2496 /// }
2497 /// # anyhow::Ok(()) };
2498 /// ```
2499 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2500 let request = get_devices::v3::Request::new();
2501
2502 self.send(request).await
2503 }
2504
2505 /// Delete the given devices from the server.
2506 ///
2507 /// # Arguments
2508 ///
2509 /// * `devices` - The list of devices that should be deleted from the
2510 /// server.
2511 ///
2512 /// * `auth_data` - This request requires user interactive auth, the first
2513 /// request needs to set this to `None` and will always fail with an
2514 /// `UiaaResponse`. The response will contain information for the
2515 /// interactive auth and the same request needs to be made but this time
2516 /// with some `auth_data` provided.
2517 ///
2518 /// ```no_run
2519 /// # use matrix_sdk::{
2520 /// # ruma::{api::client::uiaa, device_id},
2521 /// # Client, Error, config::SyncSettings,
2522 /// # };
2523 /// # use serde_json::json;
2524 /// # use url::Url;
2525 /// # use std::collections::BTreeMap;
2526 /// # async {
2527 /// # let homeserver = Url::parse("http://localhost:8080")?;
2528 /// # let mut client = Client::new(homeserver).await?;
2529 /// let devices = &[device_id!("DEVICEID").to_owned()];
2530 ///
2531 /// if let Err(e) = client.delete_devices(devices, None).await {
2532 /// if let Some(info) = e.as_uiaa_response() {
2533 /// let mut password = uiaa::Password::new(
2534 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2535 /// "wordpass".to_owned(),
2536 /// );
2537 /// password.session = info.session.clone();
2538 ///
2539 /// client
2540 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2541 /// .await?;
2542 /// }
2543 /// }
2544 /// # anyhow::Ok(()) };
2545 pub async fn delete_devices(
2546 &self,
2547 devices: &[OwnedDeviceId],
2548 auth_data: Option<uiaa::AuthData>,
2549 ) -> HttpResult<delete_devices::v3::Response> {
2550 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2551 request.auth = auth_data;
2552
2553 self.send(request).await
2554 }
2555
2556 /// Change the display name of a device owned by the current user.
2557 ///
2558 /// Returns a `update_device::Response` which specifies the result
2559 /// of the operation.
2560 ///
2561 /// # Arguments
2562 ///
2563 /// * `device_id` - The ID of the device to change the display name of.
2564 /// * `display_name` - The new display name to set.
2565 pub async fn rename_device(
2566 &self,
2567 device_id: &DeviceId,
2568 display_name: &str,
2569 ) -> HttpResult<update_device::v3::Response> {
2570 let mut request = update_device::v3::Request::new(device_id.to_owned());
2571 request.display_name = Some(display_name.to_owned());
2572
2573 self.send(request).await
2574 }
2575
2576 /// Check whether a device with a specific ID exists on the server.
2577 ///
2578 /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2579 /// with 404 and the underlying error otherwise.
2580 ///
2581 /// # Arguments
2582 ///
2583 /// * `device_id` - The ID of the device to query.
2584 pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2585 let request = device::get_device::v3::Request::new(device_id);
2586 match self.send(request).await {
2587 Ok(_) => Ok(true),
2588 Err(err) => {
2589 if let Some(error) = err.as_client_api_error()
2590 && error.status_code == 404
2591 {
2592 Ok(false)
2593 } else {
2594 Err(err.into())
2595 }
2596 }
2597 }
2598 }
2599
2600 /// Synchronize the client's state with the latest state on the server.
2601 ///
2602 /// ## Syncing Events
2603 ///
2604 /// Messages or any other type of event need to be periodically fetched from
2605 /// the server, this is achieved by sending a `/sync` request to the server.
2606 ///
2607 /// The first sync is sent out without a [`token`]. The response of the
2608 /// first sync will contain a [`next_batch`] field which should then be
2609 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2610 /// don't receive the same events multiple times.
2611 ///
2612 /// ## Long Polling
2613 ///
2614 /// A sync should in the usual case always be in flight. The
2615 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2616 /// long the server will wait for new events before it will respond.
2617 /// The server will respond immediately if some new events arrive before the
2618 /// timeout has expired. If no changes arrive and the timeout expires an
2619 /// empty sync response will be sent to the client.
2620 ///
2621 /// This method of sending a request that may not receive a response
2622 /// immediately is called long polling.
2623 ///
2624 /// ## Filtering Events
2625 ///
2626 /// The number or type of messages and events that the client should receive
2627 /// from the server can be altered using a [`Filter`].
2628 ///
2629 /// Filters can be non-trivial and, since they will be sent with every sync
2630 /// request, they may take up a bunch of unnecessary bandwidth.
2631 ///
2632 /// Luckily filters can be uploaded to the server and reused using an unique
2633 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2634 /// method.
2635 ///
2636 /// # Arguments
2637 ///
2638 /// * `sync_settings` - Settings for the sync call, this allows us to set
2639 /// various options to configure the sync:
2640 /// * [`filter`] - To configure which events we receive and which get
2641 /// [filtered] by the server
2642 /// * [`timeout`] - To configure our [long polling] setup.
2643 /// * [`token`] - To tell the server which events we already received
2644 /// and where we wish to continue syncing.
2645 /// * [`full_state`] - To tell the server that we wish to receive all
2646 /// state events, regardless of our configured [`token`].
2647 /// * [`set_presence`] - To tell the server to set the presence and to
2648 /// which state.
2649 ///
2650 /// # Examples
2651 ///
2652 /// ```no_run
2653 /// # use url::Url;
2654 /// # async {
2655 /// # let homeserver = Url::parse("http://localhost:8080")?;
2656 /// # let username = "";
2657 /// # let password = "";
2658 /// use matrix_sdk::{
2659 /// Client, config::SyncSettings,
2660 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2661 /// };
2662 ///
2663 /// let client = Client::new(homeserver).await?;
2664 /// client.matrix_auth().login_username(username, password).send().await?;
2665 ///
2666 /// // Sync once so we receive the client state and old messages.
2667 /// client.sync_once(SyncSettings::default()).await?;
2668 ///
2669 /// // Register our handler so we start responding once we receive a new
2670 /// // event.
2671 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2672 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2673 /// });
2674 ///
2675 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2676 /// // from our `sync_once()` call automatically.
2677 /// client.sync(SyncSettings::default()).await;
2678 /// # anyhow::Ok(()) };
2679 /// ```
2680 ///
2681 /// [`sync`]: #method.sync
2682 /// [`SyncSettings`]: crate::config::SyncSettings
2683 /// [`token`]: crate::config::SyncSettings#method.token
2684 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2685 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2686 /// [`set_presence`]: ruma::presence::PresenceState
2687 /// [`filter`]: crate::config::SyncSettings#method.filter
2688 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2689 /// [`next_batch`]: SyncResponse#structfield.next_batch
2690 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2691 /// [long polling]: #long-polling
2692 /// [filtered]: #filtering-events
2693 #[instrument(skip(self))]
2694 pub async fn sync_once(
2695 &self,
2696 sync_settings: crate::config::SyncSettings,
2697 ) -> Result<SyncResponse> {
2698 // The sync might not return for quite a while due to the timeout.
2699 // We'll see if there's anything crypto related to send out before we
2700 // sync, i.e. if we closed our client after a sync but before the
2701 // crypto requests were sent out.
2702 //
2703 // This will mostly be a no-op.
2704 #[cfg(feature = "e2e-encryption")]
2705 if let Err(e) = self.send_outgoing_requests().await {
2706 error!(error = ?e, "Error while sending outgoing E2EE requests");
2707 }
2708
2709 let token = match sync_settings.token {
2710 SyncToken::Specific(token) => Some(token),
2711 SyncToken::NoToken => None,
2712 SyncToken::ReusePrevious => self.sync_token().await,
2713 };
2714
2715 let request = assign!(sync_events::v3::Request::new(), {
2716 filter: sync_settings.filter.map(|f| *f),
2717 since: token,
2718 full_state: sync_settings.full_state,
2719 set_presence: sync_settings.set_presence,
2720 timeout: sync_settings.timeout,
2721 use_state_after: true,
2722 });
2723 let mut request_config = self.request_config();
2724 if let Some(timeout) = sync_settings.timeout {
2725 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2726 request_config.timeout = Some(base_timeout + timeout);
2727 }
2728
2729 let response = self.send(request).with_request_config(request_config).await?;
2730 let next_batch = response.next_batch.clone();
2731 let response = self.process_sync(response).await?;
2732
2733 #[cfg(feature = "e2e-encryption")]
2734 if let Err(e) = self.send_outgoing_requests().await {
2735 error!(error = ?e, "Error while sending outgoing E2EE requests");
2736 }
2737
2738 self.inner.sync_beat.notify(usize::MAX);
2739
2740 Ok(SyncResponse::new(next_batch, response))
2741 }
2742
2743 /// Repeatedly synchronize the client state with the server.
2744 ///
2745 /// This method will only return on error, if cancellation is needed
2746 /// the method should be wrapped in a cancelable task or the
2747 /// [`Client::sync_with_callback`] method can be used or
2748 /// [`Client::sync_with_result_callback`] if you want to handle error
2749 /// cases in the loop, too.
2750 ///
2751 /// This method will internally call [`Client::sync_once`] in a loop.
2752 ///
2753 /// This method can be used with the [`Client::add_event_handler`]
2754 /// method to react to individual events. If you instead wish to handle
2755 /// events in a bulk manner the [`Client::sync_with_callback`],
2756 /// [`Client::sync_with_result_callback`] and
2757 /// [`Client::sync_stream`] methods can be used instead. Those methods
2758 /// repeatedly return the whole sync response.
2759 ///
2760 /// # Arguments
2761 ///
2762 /// * `sync_settings` - Settings for the sync call. *Note* that those
2763 /// settings will be only used for the first sync call. See the argument
2764 /// docs for [`Client::sync_once`] for more info.
2765 ///
2766 /// # Return
2767 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2768 /// up to the user of the API to check the error and decide whether the sync
2769 /// should continue or not.
2770 ///
2771 /// # Examples
2772 ///
2773 /// ```no_run
2774 /// # use url::Url;
2775 /// # async {
2776 /// # let homeserver = Url::parse("http://localhost:8080")?;
2777 /// # let username = "";
2778 /// # let password = "";
2779 /// use matrix_sdk::{
2780 /// Client, config::SyncSettings,
2781 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2782 /// };
2783 ///
2784 /// let client = Client::new(homeserver).await?;
2785 /// client.matrix_auth().login_username(&username, &password).send().await?;
2786 ///
2787 /// // Register our handler so we start responding once we receive a new
2788 /// // event.
2789 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2790 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2791 /// });
2792 ///
2793 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2794 /// // automatically.
2795 /// client.sync(SyncSettings::default()).await?;
2796 /// # anyhow::Ok(()) };
2797 /// ```
2798 ///
2799 /// [argument docs]: #method.sync_once
2800 /// [`sync_with_callback`]: #method.sync_with_callback
2801 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2802 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2803 }
2804
2805 /// Repeatedly call sync to synchronize the client state with the server.
2806 ///
2807 /// # Arguments
2808 ///
2809 /// * `sync_settings` - Settings for the sync call. *Note* that those
2810 /// settings will be only used for the first sync call. See the argument
2811 /// docs for [`Client::sync_once`] for more info.
2812 ///
2813 /// * `callback` - A callback that will be called every time a successful
2814 /// response has been fetched from the server. The callback must return a
2815 /// boolean which signalizes if the method should stop syncing. If the
2816 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2817 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2818 ///
2819 /// # Return
2820 /// The sync runs until an error occurs or the
2821 /// callback indicates that the Loop should stop. If the callback asked for
2822 /// a regular stop, the result will be `Ok(())` otherwise the
2823 /// `Err(Error)` is returned.
2824 ///
2825 /// # Examples
2826 ///
2827 /// The following example demonstrates how to sync forever while sending all
2828 /// the interesting events through a mpsc channel to another thread e.g. a
2829 /// UI thread.
2830 ///
2831 /// ```no_run
2832 /// # use std::time::Duration;
2833 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2834 /// # use url::Url;
2835 /// # async {
2836 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2837 /// # let mut client = Client::new(homeserver).await.unwrap();
2838 ///
2839 /// use tokio::sync::mpsc::channel;
2840 ///
2841 /// let (tx, rx) = channel(100);
2842 ///
2843 /// let sync_channel = &tx;
2844 /// let sync_settings = SyncSettings::new()
2845 /// .timeout(Duration::from_secs(30));
2846 ///
2847 /// client
2848 /// .sync_with_callback(sync_settings, |response| async move {
2849 /// let channel = sync_channel;
2850 /// for (room_id, room) in response.rooms.joined {
2851 /// for event in room.timeline.events {
2852 /// channel.send(event).await.unwrap();
2853 /// }
2854 /// }
2855 ///
2856 /// LoopCtrl::Continue
2857 /// })
2858 /// .await;
2859 /// };
2860 /// ```
2861 #[instrument(skip_all)]
2862 pub async fn sync_with_callback<C>(
2863 &self,
2864 sync_settings: crate::config::SyncSettings,
2865 callback: impl Fn(SyncResponse) -> C,
2866 ) -> Result<(), Error>
2867 where
2868 C: Future<Output = LoopCtrl>,
2869 {
2870 self.sync_with_result_callback(sync_settings, |result| async {
2871 Ok(callback(result?).await)
2872 })
2873 .await
2874 }
2875
2876 /// Repeatedly call sync to synchronize the client state with the server.
2877 ///
2878 /// # Arguments
2879 ///
2880 /// * `sync_settings` - Settings for the sync call. *Note* that those
2881 /// settings will be only used for the first sync call. See the argument
2882 /// docs for [`Client::sync_once`] for more info.
2883 ///
2884 /// * `callback` - A callback that will be called every time after a
2885 /// response has been received, failure or not. The callback returns a
2886 /// `Result<LoopCtrl, Error>`, too. When returning
2887 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2888 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2889 /// function returns `Ok(())`. In case the callback can't handle the
2890 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2891 /// which results in the sync ending and the `Err(Error)` being returned.
2892 ///
2893 /// # Return
2894 /// The sync runs until an error occurs that the callback can't handle or
2895 /// the callback indicates that the Loop should stop. If the callback
2896 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2897 /// `Err(Error)` is returned.
2898 ///
2899 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2900 /// this, and are handled first without sending the result to the
2901 /// callback. Only after they have exceeded is the `Result` handed to
2902 /// the callback.
2903 ///
2904 /// # Examples
2905 ///
2906 /// The following example demonstrates how to sync forever while sending all
2907 /// the interesting events through a mpsc channel to another thread e.g. a
2908 /// UI thread.
2909 ///
2910 /// ```no_run
2911 /// # use std::time::Duration;
2912 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2913 /// # use url::Url;
2914 /// # async {
2915 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2916 /// # let mut client = Client::new(homeserver).await.unwrap();
2917 /// #
2918 /// use tokio::sync::mpsc::channel;
2919 ///
2920 /// let (tx, rx) = channel(100);
2921 ///
2922 /// let sync_channel = &tx;
2923 /// let sync_settings = SyncSettings::new()
2924 /// .timeout(Duration::from_secs(30));
2925 ///
2926 /// client
2927 /// .sync_with_result_callback(sync_settings, |response| async move {
2928 /// let channel = sync_channel;
2929 /// let sync_response = response?;
2930 /// for (room_id, room) in sync_response.rooms.joined {
2931 /// for event in room.timeline.events {
2932 /// channel.send(event).await.unwrap();
2933 /// }
2934 /// }
2935 ///
2936 /// Ok(LoopCtrl::Continue)
2937 /// })
2938 /// .await;
2939 /// };
2940 /// ```
2941 #[instrument(skip(self, callback))]
2942 pub async fn sync_with_result_callback<C>(
2943 &self,
2944 sync_settings: crate::config::SyncSettings,
2945 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2946 ) -> Result<(), Error>
2947 where
2948 C: Future<Output = Result<LoopCtrl, Error>>,
2949 {
2950 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2951
2952 while let Some(result) = sync_stream.next().await {
2953 trace!("Running callback");
2954 if callback(result).await? == LoopCtrl::Break {
2955 trace!("Callback told us to stop");
2956 break;
2957 }
2958 trace!("Done running callback");
2959 }
2960
2961 Ok(())
2962 }
2963
2964 //// Repeatedly synchronize the client state with the server.
2965 ///
2966 /// This method will internally call [`Client::sync_once`] in a loop and is
2967 /// equivalent to the [`Client::sync`] method but the responses are provided
2968 /// as an async stream.
2969 ///
2970 /// # Arguments
2971 ///
2972 /// * `sync_settings` - Settings for the sync call. *Note* that those
2973 /// settings will be only used for the first sync call. See the argument
2974 /// docs for [`Client::sync_once`] for more info.
2975 ///
2976 /// # Examples
2977 ///
2978 /// ```no_run
2979 /// # use url::Url;
2980 /// # async {
2981 /// # let homeserver = Url::parse("http://localhost:8080")?;
2982 /// # let username = "";
2983 /// # let password = "";
2984 /// use futures_util::StreamExt;
2985 /// use matrix_sdk::{Client, config::SyncSettings};
2986 ///
2987 /// let client = Client::new(homeserver).await?;
2988 /// client.matrix_auth().login_username(&username, &password).send().await?;
2989 ///
2990 /// let mut sync_stream =
2991 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2992 ///
2993 /// while let Some(Ok(response)) = sync_stream.next().await {
2994 /// for room in response.rooms.joined.values() {
2995 /// for e in &room.timeline.events {
2996 /// if let Ok(event) = e.raw().deserialize() {
2997 /// println!("Received event {:?}", event);
2998 /// }
2999 /// }
3000 /// }
3001 /// }
3002 ///
3003 /// # anyhow::Ok(()) };
3004 /// ```
3005 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3006 #[instrument(skip(self))]
3007 pub async fn sync_stream(
3008 &self,
3009 mut sync_settings: crate::config::SyncSettings,
3010 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3011 let mut is_first_sync = true;
3012 let mut timeout = None;
3013 let mut last_sync_time: Option<Instant> = None;
3014
3015 let parent_span = Span::current();
3016
3017 async_stream::stream!({
3018 loop {
3019 trace!("Syncing");
3020
3021 if sync_settings.ignore_timeout_on_first_sync {
3022 if is_first_sync {
3023 timeout = sync_settings.timeout.take();
3024 } else if sync_settings.timeout.is_none() && timeout.is_some() {
3025 sync_settings.timeout = timeout.take();
3026 }
3027
3028 is_first_sync = false;
3029 }
3030
3031 yield self
3032 .sync_loop_helper(&mut sync_settings)
3033 .instrument(parent_span.clone())
3034 .await;
3035
3036 Client::delay_sync(&mut last_sync_time).await
3037 }
3038 })
3039 }
3040
3041 /// Get the current, if any, sync token of the client.
3042 /// This will be None if the client didn't sync at least once.
3043 pub(crate) async fn sync_token(&self) -> Option<String> {
3044 self.inner.base_client.sync_token().await
3045 }
3046
3047 /// Gets information about the owner of a given access token.
3048 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3049 let request = whoami::v3::Request::new();
3050 self.send(request).await
3051 }
3052
3053 /// Subscribes a new receiver to client SessionChange broadcasts.
3054 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3055 let broadcast = &self.auth_ctx().session_change_sender;
3056 broadcast.subscribe()
3057 }
3058
3059 /// Sets the save/restore session callbacks.
3060 ///
3061 /// This is another mechanism to get synchronous updates to session tokens,
3062 /// while [`Self::subscribe_to_session_changes`] provides an async update.
3063 pub fn set_session_callbacks(
3064 &self,
3065 reload_session_callback: Box<ReloadSessionCallback>,
3066 save_session_callback: Box<SaveSessionCallback>,
3067 ) -> Result<()> {
3068 self.inner
3069 .auth_ctx
3070 .reload_session_callback
3071 .set(reload_session_callback)
3072 .map_err(|_| Error::MultipleSessionCallbacks)?;
3073
3074 self.inner
3075 .auth_ctx
3076 .save_session_callback
3077 .set(save_session_callback)
3078 .map_err(|_| Error::MultipleSessionCallbacks)?;
3079
3080 Ok(())
3081 }
3082
3083 /// Get the notification settings of the current owner of the client.
3084 pub async fn notification_settings(&self) -> NotificationSettings {
3085 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3086 NotificationSettings::new(self.clone(), ruleset)
3087 }
3088
3089 /// Create a new specialized `Client` that can process notifications.
3090 ///
3091 /// See [`CrossProcessLock::new`] to learn more about
3092 /// `cross_process_store_locks_holder_name`.
3093 ///
3094 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3095 pub async fn notification_client(
3096 &self,
3097 cross_process_store_locks_holder_name: String,
3098 ) -> Result<Client> {
3099 let client = Client {
3100 inner: ClientInner::new(
3101 self.inner.auth_ctx.clone(),
3102 self.server().cloned(),
3103 self.homeserver(),
3104 self.sliding_sync_version(),
3105 self.inner.http_client.clone(),
3106 self.inner
3107 .base_client
3108 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
3109 .await?,
3110 self.inner.caches.supported_versions.read().await.clone(),
3111 self.inner.caches.well_known.read().await.clone(),
3112 self.inner.respect_login_well_known,
3113 self.inner.event_cache.clone(),
3114 self.inner.send_queue_data.clone(),
3115 self.inner.latest_events.clone(),
3116 #[cfg(feature = "e2e-encryption")]
3117 self.inner.e2ee.encryption_settings,
3118 #[cfg(feature = "e2e-encryption")]
3119 self.inner.enable_share_history_on_invite,
3120 cross_process_store_locks_holder_name,
3121 #[cfg(feature = "experimental-search")]
3122 self.inner.search_index.clone(),
3123 self.inner.thread_subscription_catchup.clone(),
3124 )
3125 .await,
3126 };
3127
3128 Ok(client)
3129 }
3130
3131 /// The [`EventCache`] instance for this [`Client`].
3132 pub fn event_cache(&self) -> &EventCache {
3133 // SAFETY: always initialized in the `Client` ctor.
3134 self.inner.event_cache.get().unwrap()
3135 }
3136
3137 /// The [`LatestEvents`] instance for this [`Client`].
3138 pub async fn latest_events(&self) -> &LatestEvents {
3139 self.inner
3140 .latest_events
3141 .get_or_init(|| async {
3142 LatestEvents::new(
3143 WeakClient::from_client(self),
3144 self.event_cache().clone(),
3145 SendQueue::new(self.clone()),
3146 self.room_info_notable_update_receiver(),
3147 )
3148 })
3149 .await
3150 }
3151
3152 /// Waits until an at least partially synced room is received, and returns
3153 /// it.
3154 ///
3155 /// **Note: this function will loop endlessly until either it finds the room
3156 /// or an externally set timeout happens.**
3157 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3158 loop {
3159 if let Some(room) = self.get_room(room_id) {
3160 if room.is_state_partially_or_fully_synced() {
3161 debug!("Found just created room!");
3162 return room;
3163 }
3164 debug!("Room wasn't partially synced, waiting for sync beat to try again");
3165 } else {
3166 debug!("Room wasn't found, waiting for sync beat to try again");
3167 }
3168 self.inner.sync_beat.listen().await;
3169 }
3170 }
3171
3172 /// Knock on a room given its `room_id_or_alias` to ask for permission to
3173 /// join it.
3174 pub async fn knock(
3175 &self,
3176 room_id_or_alias: OwnedRoomOrAliasId,
3177 reason: Option<String>,
3178 server_names: Vec<OwnedServerName>,
3179 ) -> Result<Room> {
3180 let request =
3181 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3182 let response = self.send(request).await?;
3183 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3184 Ok(Room::new(self.clone(), base_room))
3185 }
3186
3187 /// Checks whether the provided `user_id` belongs to an ignored user.
3188 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3189 self.base_client().is_user_ignored(user_id).await
3190 }
3191
3192 /// Gets the `max_upload_size` value from the homeserver, getting either a
3193 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3194 /// missing.
3195 ///
3196 /// Check the spec for more info:
3197 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3198 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3199 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3200 if let Some(data) = max_upload_size_lock.get() {
3201 return Ok(data.to_owned());
3202 }
3203
3204 // Use the authenticated endpoint when the server supports it.
3205 let supported_versions = self.supported_versions().await?;
3206 let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3207 .is_supported(&supported_versions);
3208
3209 let upload_size = if use_auth {
3210 self.send(authenticated_media::get_media_config::v1::Request::default())
3211 .await?
3212 .upload_size
3213 } else {
3214 #[allow(deprecated)]
3215 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3216 };
3217
3218 match max_upload_size_lock.set(upload_size) {
3219 Ok(_) => Ok(upload_size),
3220 Err(error) => {
3221 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3222 }
3223 }
3224 }
3225
3226 /// The settings to use for decrypting events.
3227 #[cfg(feature = "e2e-encryption")]
3228 pub fn decryption_settings(&self) -> &DecryptionSettings {
3229 &self.base_client().decryption_settings
3230 }
3231
3232 /// Returns the [`SearchIndex`] for this [`Client`].
3233 #[cfg(feature = "experimental-search")]
3234 pub fn search_index(&self) -> &SearchIndex {
3235 &self.inner.search_index
3236 }
3237
3238 /// Whether the client is configured to take thread subscriptions (MSC4306
3239 /// and MSC4308) into account.
3240 ///
3241 /// This may cause filtering out of thread subscriptions, and loading the
3242 /// thread subscriptions via the sliding sync extension, when the room
3243 /// list service is being used.
3244 pub fn enabled_thread_subscriptions(&self) -> bool {
3245 match self.base_client().threading_support {
3246 ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
3247 ThreadingSupport::Disabled => false,
3248 }
3249 }
3250
3251 /// Fetch thread subscriptions changes between `from` and up to `to`.
3252 ///
3253 /// The `limit` optional parameter can be used to limit the number of
3254 /// entries in a response. It can also be overridden by the server, if
3255 /// it's deemed too large.
3256 pub async fn fetch_thread_subscriptions(
3257 &self,
3258 from: Option<String>,
3259 to: Option<String>,
3260 limit: Option<UInt>,
3261 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3262 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3263 from,
3264 to,
3265 limit,
3266 });
3267 Ok(self.send(request).await?)
3268 }
3269
3270 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3271 self.inner.thread_subscription_catchup.get().unwrap()
3272 }
3273
3274 /// Perform database optimizations if any are available, i.e. vacuuming in
3275 /// SQLite.
3276 ///
3277 /// **Warning:** this was added to check if SQLite fragmentation was the
3278 /// source of performance issues, **DO NOT use in production**.
3279 #[doc(hidden)]
3280 pub async fn optimize_stores(&self) -> Result<()> {
3281 trace!("Optimizing state store...");
3282 self.state_store().optimize().await?;
3283
3284 trace!("Optimizing event cache store...");
3285 if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3286 clean_lock.optimize().await?;
3287 }
3288
3289 trace!("Optimizing media store...");
3290 self.media_store().lock().await?.optimize().await?;
3291
3292 Ok(())
3293 }
3294
3295 /// Returns the sizes of the existing stores, if known.
3296 pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3297 #[cfg(feature = "e2e-encryption")]
3298 let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3299 && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3300 {
3301 Some(store_size)
3302 } else {
3303 None
3304 };
3305 #[cfg(not(feature = "e2e-encryption"))]
3306 let crypto_store_size = None;
3307
3308 let state_store_size = self.state_store().get_size().await.ok().flatten();
3309
3310 let event_cache_store_size = if let Some(clean_lock) =
3311 self.event_cache_store().lock().await?.as_clean()
3312 && let Ok(Some(store_size)) = clean_lock.get_size().await
3313 {
3314 Some(store_size)
3315 } else {
3316 None
3317 };
3318
3319 let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3320
3321 Ok(StoreSizes {
3322 crypto_store: crypto_store_size,
3323 state_store: state_store_size,
3324 event_cache_store: event_cache_store_size,
3325 media_store: media_store_size,
3326 })
3327 }
3328
3329 /// Get a reference to the client's task monitor, for spawning background
3330 /// tasks.
3331 pub fn task_monitor(&self) -> &TaskMonitor {
3332 &self.inner.task_monitor
3333 }
3334
3335 /// Add a subscriber for duplicate key upload error notifications triggered
3336 /// by requests to /keys/upload.
3337 #[cfg(feature = "e2e-encryption")]
3338 pub fn subscribe_to_duplicate_key_upload_errors(
3339 &self,
3340 ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3341 self.inner.duplicate_key_upload_error_sender.subscribe()
3342 }
3343}
3344
3345/// Contains the disk size of the different stores, if known. It won't be
3346/// available for in-memory stores.
3347#[derive(Debug, Clone)]
3348pub struct StoreSizes {
3349 /// The size of the CryptoStore.
3350 pub crypto_store: Option<usize>,
3351 /// The size of the StateStore.
3352 pub state_store: Option<usize>,
3353 /// The size of the EventCacheStore.
3354 pub event_cache_store: Option<usize>,
3355 /// The size of the MediaStore.
3356 pub media_store: Option<usize>,
3357}
3358
3359#[cfg(any(feature = "testing", test))]
3360impl Client {
3361 /// Test helper to mark users as tracked by the crypto layer.
3362 #[cfg(feature = "e2e-encryption")]
3363 pub async fn update_tracked_users_for_testing(
3364 &self,
3365 user_ids: impl IntoIterator<Item = &UserId>,
3366 ) {
3367 let olm = self.olm_machine().await;
3368 let olm = olm.as_ref().unwrap();
3369 olm.update_tracked_users(user_ids).await.unwrap();
3370 }
3371}
3372
3373/// A weak reference to the inner client, useful when trying to get a handle
3374/// on the owning client.
3375#[derive(Clone, Debug)]
3376pub(crate) struct WeakClient {
3377 client: Weak<ClientInner>,
3378}
3379
3380impl WeakClient {
3381 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3382 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3383 Self { client: Arc::downgrade(client) }
3384 }
3385
3386 /// Construct a [`WeakClient`] from a [`Client`].
3387 pub fn from_client(client: &Client) -> Self {
3388 Self::from_inner(&client.inner)
3389 }
3390
3391 /// Attempts to get a [`Client`] from this [`WeakClient`].
3392 pub fn get(&self) -> Option<Client> {
3393 self.client.upgrade().map(|inner| Client { inner })
3394 }
3395
3396 /// Gets the number of strong (`Arc`) pointers still pointing to this
3397 /// client.
3398 #[allow(dead_code)]
3399 pub fn strong_count(&self) -> usize {
3400 self.client.strong_count()
3401 }
3402}
3403
3404/// Information about the state of a room before we joined it.
3405#[derive(Debug, Clone, Default)]
3406struct PreJoinRoomInfo {
3407 /// The user who invited us to the room, if any.
3408 pub inviter: Option<RoomMember>,
3409}
3410
3411// The http mocking library is not supported for wasm32
3412#[cfg(all(test, not(target_family = "wasm")))]
3413pub(crate) mod tests {
3414 use std::{sync::Arc, time::Duration};
3415
3416 use assert_matches::assert_matches;
3417 use assert_matches2::assert_let;
3418 use eyeball::SharedObservable;
3419 use futures_util::{FutureExt, StreamExt, pin_mut};
3420 use js_int::{UInt, uint};
3421 use matrix_sdk_base::{
3422 RoomState,
3423 store::{MemoryStore, StoreConfig},
3424 };
3425 use matrix_sdk_test::{
3426 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3427 event_factory::EventFactory,
3428 };
3429 #[cfg(target_family = "wasm")]
3430 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3431
3432 use ruma::{
3433 RoomId, ServerName, UserId,
3434 api::{
3435 FeatureFlag, MatrixVersion,
3436 client::{
3437 discovery::discover_homeserver::RtcFocusInfo,
3438 room::create_room::v3::Request as CreateRoomRequest,
3439 },
3440 },
3441 assign,
3442 events::{
3443 ignored_user_list::IgnoredUserListEventContent,
3444 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3445 },
3446 owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3447 };
3448 use serde_json::json;
3449 use stream_assert::{assert_next_matches, assert_pending};
3450 use tokio::{
3451 spawn,
3452 time::{sleep, timeout},
3453 };
3454 use url::Url;
3455
3456 use super::Client;
3457 use crate::{
3458 Error, TransmissionProgress,
3459 client::{WeakClient, futures::SendMediaUploadRequest},
3460 config::{RequestConfig, SyncSettings},
3461 futures::SendRequest,
3462 media::MediaError,
3463 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3464 };
3465
3466 #[async_test]
3467 async fn test_account_data() {
3468 let server = MatrixMockServer::new().await;
3469 let client = server.client_builder().build().await;
3470
3471 let f = EventFactory::new();
3472 server
3473 .mock_sync()
3474 .ok_and_run(&client, |builder| {
3475 builder.add_global_account_data(
3476 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3477 );
3478 })
3479 .await;
3480
3481 let content = client
3482 .account()
3483 .account_data::<IgnoredUserListEventContent>()
3484 .await
3485 .unwrap()
3486 .unwrap()
3487 .deserialize()
3488 .unwrap();
3489
3490 assert_eq!(content.ignored_users.len(), 1);
3491 }
3492
3493 #[async_test]
3494 async fn test_successful_discovery() {
3495 // Imagine this is `matrix.org`.
3496 let server = MatrixMockServer::new().await;
3497 let server_url = server.uri();
3498
3499 // Imagine this is `matrix-client.matrix.org`.
3500 let homeserver = MatrixMockServer::new().await;
3501 let homeserver_url = homeserver.uri();
3502
3503 // Imagine Alice has the user ID `@alice:matrix.org`.
3504 let domain = server_url.strip_prefix("http://").unwrap();
3505 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3506
3507 // The `.well-known` is on the server (e.g. `matrix.org`).
3508 server
3509 .mock_well_known()
3510 .ok_with_homeserver_url(&homeserver_url)
3511 .mock_once()
3512 .named("well-known")
3513 .mount()
3514 .await;
3515
3516 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3517 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3518
3519 let client = Client::builder()
3520 .insecure_server_name_no_tls(alice.server_name())
3521 .build()
3522 .await
3523 .unwrap();
3524
3525 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3526 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3527 client.server_versions().await.unwrap();
3528 }
3529
3530 #[async_test]
3531 async fn test_discovery_broken_server() {
3532 let server = MatrixMockServer::new().await;
3533 let server_url = server.uri();
3534 let domain = server_url.strip_prefix("http://").unwrap();
3535 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3536
3537 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3538
3539 assert!(
3540 Client::builder()
3541 .insecure_server_name_no_tls(alice.server_name())
3542 .build()
3543 .await
3544 .is_err(),
3545 "Creating a client from a user ID should fail when the .well-known request fails."
3546 );
3547 }
3548
3549 #[async_test]
3550 async fn test_room_creation() {
3551 let server = MatrixMockServer::new().await;
3552 let client = server.client_builder().build().await;
3553
3554 server
3555 .mock_sync()
3556 .ok_and_run(&client, |builder| {
3557 builder.add_joined_room(
3558 JoinedRoomBuilder::default()
3559 .add_state_event(StateTestEvent::Member)
3560 .add_state_event(StateTestEvent::PowerLevels),
3561 );
3562 })
3563 .await;
3564
3565 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3566 assert_eq!(room.state(), RoomState::Joined);
3567 }
3568
3569 #[async_test]
3570 async fn test_retry_limit_http_requests() {
3571 let server = MatrixMockServer::new().await;
3572 let client = server
3573 .client_builder()
3574 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3575 .build()
3576 .await;
3577
3578 assert!(client.request_config().retry_limit.unwrap() == 4);
3579
3580 server.mock_who_am_i().error500().expect(4).mount().await;
3581
3582 client.whoami().await.unwrap_err();
3583 }
3584
3585 #[async_test]
3586 async fn test_retry_timeout_http_requests() {
3587 // Keep this timeout small so that the test doesn't take long
3588 let retry_timeout = Duration::from_secs(5);
3589 let server = MatrixMockServer::new().await;
3590 let client = server
3591 .client_builder()
3592 .on_builder(|builder| {
3593 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3594 })
3595 .build()
3596 .await;
3597
3598 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3599
3600 server.mock_login().error500().expect(2..).mount().await;
3601
3602 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3603 }
3604
3605 #[async_test]
3606 async fn test_short_retry_initial_http_requests() {
3607 let server = MatrixMockServer::new().await;
3608 let client = server
3609 .client_builder()
3610 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3611 .build()
3612 .await;
3613
3614 server.mock_login().error500().expect(3..).mount().await;
3615
3616 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3617 }
3618
3619 #[async_test]
3620 async fn test_no_retry_http_requests() {
3621 let server = MatrixMockServer::new().await;
3622 let client = server.client_builder().build().await;
3623
3624 server.mock_devices().error500().mock_once().mount().await;
3625
3626 client.devices().await.unwrap_err();
3627 }
3628
3629 #[async_test]
3630 async fn test_set_homeserver() {
3631 let client = MockClientBuilder::new(None).build().await;
3632 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3633
3634 let homeserver = Url::parse("http://example.com/").unwrap();
3635 client.set_homeserver(homeserver.clone());
3636 assert_eq!(client.homeserver(), homeserver);
3637 }
3638
3639 #[async_test]
3640 async fn test_search_user_request() {
3641 let server = MatrixMockServer::new().await;
3642 let client = server.client_builder().build().await;
3643
3644 server.mock_user_directory().ok().mock_once().mount().await;
3645
3646 let response = client.search_users("test", 50).await.unwrap();
3647 assert_eq!(response.results.len(), 1);
3648 let result = &response.results[0];
3649 assert_eq!(result.user_id.to_string(), "@test:example.me");
3650 assert_eq!(result.display_name.clone().unwrap(), "Test");
3651 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3652 assert!(!response.limited);
3653 }
3654
3655 #[async_test]
3656 async fn test_request_unstable_features() {
3657 let server = MatrixMockServer::new().await;
3658 let client = server.client_builder().no_server_versions().build().await;
3659
3660 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3661
3662 let unstable_features = client.unstable_features().await.unwrap();
3663 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3664 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3665 }
3666
3667 #[async_test]
3668 async fn test_can_homeserver_push_encrypted_event_to_device() {
3669 let server = MatrixMockServer::new().await;
3670 let client = server.client_builder().no_server_versions().build().await;
3671
3672 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3673
3674 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3675 assert!(msc4028_enabled);
3676 }
3677
3678 #[async_test]
3679 async fn test_recently_visited_rooms() {
3680 // Tracking recently visited rooms requires authentication
3681 let client = MockClientBuilder::new(None).unlogged().build().await;
3682 assert_matches!(
3683 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3684 Err(Error::AuthenticationRequired)
3685 );
3686
3687 let client = MockClientBuilder::new(None).build().await;
3688 let account = client.account();
3689
3690 // We should start off with an empty list
3691 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3692
3693 // Tracking a valid room id should add it to the list
3694 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3695 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3696 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3697
3698 // And the existing list shouldn't be changed
3699 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3700 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3701
3702 // Tracking the same room again shouldn't change the list
3703 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3704 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3705 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3706
3707 // Tracking a second room should add it to the front of the list
3708 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3709 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3710 assert_eq!(
3711 account.get_recently_visited_rooms().await.unwrap(),
3712 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3713 );
3714
3715 // Tracking the first room yet again should move it to the front of the list
3716 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3717 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3718 assert_eq!(
3719 account.get_recently_visited_rooms().await.unwrap(),
3720 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3721 );
3722
3723 // Tracking should be capped at 20
3724 for n in 0..20 {
3725 account
3726 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3727 .await
3728 .unwrap();
3729 }
3730
3731 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3732
3733 // And the initial rooms should've been pushed out
3734 let rooms = account.get_recently_visited_rooms().await.unwrap();
3735 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3736 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3737
3738 // And the last tracked room should be the first
3739 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3740 }
3741
3742 #[async_test]
3743 async fn test_client_no_cycle_with_event_cache() {
3744 let client = MockClientBuilder::new(None).build().await;
3745
3746 // Wait for the init tasks to die.
3747 sleep(Duration::from_secs(1)).await;
3748
3749 let weak_client = WeakClient::from_client(&client);
3750 assert_eq!(weak_client.strong_count(), 1);
3751
3752 {
3753 let room_id = room_id!("!room:example.org");
3754
3755 // Have the client know the room.
3756 let response = SyncResponseBuilder::default()
3757 .add_joined_room(JoinedRoomBuilder::new(room_id))
3758 .build_sync_response();
3759 client.inner.base_client.receive_sync_response(response).await.unwrap();
3760
3761 client.event_cache().subscribe().unwrap();
3762
3763 let (_room_event_cache, _drop_handles) =
3764 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3765 }
3766
3767 drop(client);
3768
3769 // Give a bit of time for background tasks to die.
3770 sleep(Duration::from_secs(1)).await;
3771
3772 // The weak client must be the last reference to the client now.
3773 assert_eq!(weak_client.strong_count(), 0);
3774 let client = weak_client.get();
3775 assert!(
3776 client.is_none(),
3777 "too many strong references to the client: {}",
3778 Arc::strong_count(&client.unwrap().inner)
3779 );
3780 }
3781
3782 #[async_test]
3783 async fn test_supported_versions_caching() {
3784 let server = MatrixMockServer::new().await;
3785
3786 let versions_mock = server
3787 .mock_versions()
3788 .expect_default_access_token()
3789 .ok_with_unstable_features()
3790 .named("first versions mock")
3791 .expect(1)
3792 .mount_as_scoped()
3793 .await;
3794
3795 let memory_store = Arc::new(MemoryStore::new());
3796 let client = server
3797 .client_builder()
3798 .no_server_versions()
3799 .on_builder(|builder| {
3800 builder.store_config(
3801 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3802 .state_store(memory_store.clone()),
3803 )
3804 })
3805 .build()
3806 .await;
3807
3808 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3809
3810 // The result was cached.
3811 assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3812 // This subsequent call hits the in-memory cache.
3813 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3814
3815 drop(client);
3816
3817 let client = server
3818 .client_builder()
3819 .no_server_versions()
3820 .on_builder(|builder| {
3821 builder.store_config(
3822 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3823 .state_store(memory_store.clone()),
3824 )
3825 })
3826 .build()
3827 .await;
3828
3829 // These calls to the new client hit the on-disk cache.
3830 assert!(
3831 client
3832 .unstable_features()
3833 .await
3834 .unwrap()
3835 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3836 );
3837 let supported = client.supported_versions().await.unwrap();
3838 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3839 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3840
3841 // Then this call hits the in-memory cache.
3842 let supported = client.supported_versions().await.unwrap();
3843 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3844 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3845
3846 drop(versions_mock);
3847
3848 // Now, reset the cache, and observe the endpoints being called again once.
3849 client.reset_supported_versions().await.unwrap();
3850
3851 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3852
3853 // Hits network again.
3854 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3855 // Hits in-memory cache again.
3856 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3857 }
3858
3859 #[async_test]
3860 async fn test_well_known_caching() {
3861 let server = MatrixMockServer::new().await;
3862 let server_url = server.uri();
3863 let domain = server_url.strip_prefix("http://").unwrap();
3864 let server_name = <&ServerName>::try_from(domain).unwrap();
3865 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3866
3867 let well_known_mock = server
3868 .mock_well_known()
3869 .ok()
3870 .named("well known mock")
3871 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3872 .mount_as_scoped()
3873 .await;
3874
3875 let memory_store = Arc::new(MemoryStore::new());
3876 let client = Client::builder()
3877 .insecure_server_name_no_tls(server_name)
3878 .store_config(
3879 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3880 .state_store(memory_store.clone()),
3881 )
3882 .build()
3883 .await
3884 .unwrap();
3885
3886 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3887
3888 // This subsequent call hits the in-memory cache.
3889 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3890
3891 drop(client);
3892
3893 let client = server
3894 .client_builder()
3895 .no_server_versions()
3896 .on_builder(|builder| {
3897 builder.store_config(
3898 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3899 .state_store(memory_store.clone()),
3900 )
3901 })
3902 .build()
3903 .await;
3904
3905 // This call to the new client hits the on-disk cache.
3906 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3907
3908 // Then this call hits the in-memory cache.
3909 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3910
3911 drop(well_known_mock);
3912
3913 // Now, reset the cache, and observe the endpoints being called again once.
3914 client.reset_well_known().await.unwrap();
3915
3916 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3917
3918 // Hits network again.
3919 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3920 // Hits in-memory cache again.
3921 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3922 }
3923
3924 #[async_test]
3925 async fn test_missing_well_known_caching() {
3926 let server = MatrixMockServer::new().await;
3927 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3928
3929 let well_known_mock = server
3930 .mock_well_known()
3931 .error_unrecognized()
3932 .named("first well-known mock")
3933 .expect(1)
3934 .mount_as_scoped()
3935 .await;
3936
3937 let memory_store = Arc::new(MemoryStore::new());
3938 let client = server
3939 .client_builder()
3940 .on_builder(|builder| {
3941 builder.store_config(
3942 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3943 .state_store(memory_store.clone()),
3944 )
3945 })
3946 .build()
3947 .await;
3948
3949 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3950
3951 // This subsequent call hits the in-memory cache.
3952 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3953
3954 drop(client);
3955
3956 let client = server
3957 .client_builder()
3958 .on_builder(|builder| {
3959 builder.store_config(
3960 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3961 .state_store(memory_store.clone()),
3962 )
3963 })
3964 .build()
3965 .await;
3966
3967 // This call to the new client hits the on-disk cache.
3968 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3969
3970 // Then this call hits the in-memory cache.
3971 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3972
3973 drop(well_known_mock);
3974
3975 // Now, reset the cache, and observe the endpoints being called again once.
3976 client.reset_well_known().await.unwrap();
3977
3978 server
3979 .mock_well_known()
3980 .error_unrecognized()
3981 .expect(1)
3982 .named("second well-known mock")
3983 .mount()
3984 .await;
3985
3986 // Hits network again.
3987 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3988 // Hits in-memory cache again.
3989 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3990 }
3991
3992 #[async_test]
3993 async fn test_no_network_doesnt_cause_infinite_retries() {
3994 // We want infinite retries for transient errors.
3995 let client = MockClientBuilder::new(None)
3996 .on_builder(|builder| builder.request_config(RequestConfig::new()))
3997 .build()
3998 .await;
3999
4000 // We don't define a mock server on purpose here, so that the error is really a
4001 // network error.
4002 client.whoami().await.unwrap_err();
4003 }
4004
4005 #[async_test]
4006 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4007 let server = MatrixMockServer::new().await;
4008 let client = server.client_builder().build().await;
4009
4010 let room_id = room_id!("!room:example.org");
4011
4012 server
4013 .mock_sync()
4014 .ok_and_run(&client, |builder| {
4015 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4016 })
4017 .await;
4018
4019 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4020 assert_eq!(room.room_id(), room_id);
4021 }
4022
4023 #[async_test]
4024 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4025 let server = MatrixMockServer::new().await;
4026 let client = server.client_builder().build().await;
4027
4028 let room_id = room_id!("!room:example.org");
4029
4030 let client = Arc::new(client);
4031
4032 // Perform the /sync request with a delay so it starts after the
4033 // `await_room_remote_echo` call has happened
4034 spawn({
4035 let client = client.clone();
4036 async move {
4037 sleep(Duration::from_millis(100)).await;
4038
4039 server
4040 .mock_sync()
4041 .ok_and_run(&client, |builder| {
4042 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4043 })
4044 .await;
4045 }
4046 });
4047
4048 let room =
4049 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4050 assert_eq!(room.room_id(), room_id);
4051 }
4052
4053 #[async_test]
4054 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4055 let client = MockClientBuilder::new(None).build().await;
4056
4057 let room_id = room_id!("!room:example.org");
4058 // Room is not present so the client won't be able to find it. The call will
4059 // timeout.
4060 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4061 }
4062
4063 #[async_test]
4064 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4065 let server = MatrixMockServer::new().await;
4066 let client = server.client_builder().build().await;
4067
4068 server.mock_create_room().ok().mount().await;
4069
4070 // Create a room in the internal store
4071 let room = client
4072 .create_room(assign!(CreateRoomRequest::new(), {
4073 invite: vec![],
4074 is_direct: false,
4075 }))
4076 .await
4077 .unwrap();
4078
4079 // Room is locally present, but not synced, the call will timeout
4080 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4081 .await
4082 .unwrap_err();
4083 }
4084
4085 #[async_test]
4086 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4087 let server = MatrixMockServer::new().await;
4088 let client = server.client_builder().build().await;
4089
4090 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4091
4092 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4093 assert_matches!(ret, Ok(true));
4094 }
4095
4096 #[async_test]
4097 async fn test_is_room_alias_available_if_alias_is_resolved() {
4098 let server = MatrixMockServer::new().await;
4099 let client = server.client_builder().build().await;
4100
4101 server
4102 .mock_room_directory_resolve_alias()
4103 .ok("!some_room_id:matrix.org", Vec::new())
4104 .expect(1)
4105 .mount()
4106 .await;
4107
4108 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4109 assert_matches!(ret, Ok(false));
4110 }
4111
4112 #[async_test]
4113 async fn test_is_room_alias_available_if_error_found() {
4114 let server = MatrixMockServer::new().await;
4115 let client = server.client_builder().build().await;
4116
4117 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4118
4119 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4120 assert_matches!(ret, Err(_));
4121 }
4122
4123 #[async_test]
4124 async fn test_create_room_alias() {
4125 let server = MatrixMockServer::new().await;
4126 let client = server.client_builder().build().await;
4127
4128 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4129
4130 let ret = client
4131 .create_room_alias(
4132 room_alias_id!("#some_alias:matrix.org"),
4133 room_id!("!some_room:matrix.org"),
4134 )
4135 .await;
4136 assert_matches!(ret, Ok(()));
4137 }
4138
4139 #[async_test]
4140 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4141 let server = MatrixMockServer::new().await;
4142 let client = server.client_builder().build().await;
4143
4144 let room_id = room_id!("!a-room:matrix.org");
4145
4146 // Make sure the summary endpoint is called once
4147 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4148
4149 // We create a locally cached invited room
4150 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4151
4152 // And we get a preview, the server endpoint was reached
4153 let preview = client
4154 .get_room_preview(room_id.into(), Vec::new())
4155 .await
4156 .expect("Room preview should be retrieved");
4157
4158 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
4159 }
4160
4161 #[async_test]
4162 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4163 let server = MatrixMockServer::new().await;
4164 let client = server.client_builder().build().await;
4165
4166 let room_id = room_id!("!a-room:matrix.org");
4167
4168 // Make sure the summary endpoint is called once
4169 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4170
4171 // We create a locally cached left room
4172 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4173
4174 // And we get a preview, the server endpoint was reached
4175 let preview = client
4176 .get_room_preview(room_id.into(), Vec::new())
4177 .await
4178 .expect("Room preview should be retrieved");
4179
4180 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
4181 }
4182
4183 #[async_test]
4184 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4185 let server = MatrixMockServer::new().await;
4186 let client = server.client_builder().build().await;
4187
4188 let room_id = room_id!("!a-room:matrix.org");
4189
4190 // Make sure the summary endpoint is called once
4191 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4192
4193 // We create a locally cached knocked room
4194 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4195
4196 // And we get a preview, the server endpoint was reached
4197 let preview = client
4198 .get_room_preview(room_id.into(), Vec::new())
4199 .await
4200 .expect("Room preview should be retrieved");
4201
4202 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
4203 }
4204
4205 #[async_test]
4206 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4207 let server = MatrixMockServer::new().await;
4208 let client = server.client_builder().build().await;
4209
4210 let room_id = room_id!("!a-room:matrix.org");
4211
4212 // Make sure the summary endpoint is not called
4213 server.mock_room_summary().ok(room_id).never().mount().await;
4214
4215 // We create a locally cached joined room
4216 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4217
4218 // And we get a preview, no server endpoint was reached
4219 let preview = client
4220 .get_room_preview(room_id.into(), Vec::new())
4221 .await
4222 .expect("Room preview should be retrieved");
4223
4224 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
4225 }
4226
4227 #[async_test]
4228 async fn test_media_preview_config() {
4229 let server = MatrixMockServer::new().await;
4230 let client = server.client_builder().build().await;
4231
4232 server
4233 .mock_sync()
4234 .ok_and_run(&client, |builder| {
4235 builder.add_custom_global_account_data(json!({
4236 "content": {
4237 "media_previews": "private",
4238 "invite_avatars": "off"
4239 },
4240 "type": "m.media_preview_config"
4241 }));
4242 })
4243 .await;
4244
4245 let (initial_value, stream) =
4246 client.account().observe_media_preview_config().await.unwrap();
4247
4248 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4249 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4250 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4251 pin_mut!(stream);
4252 assert_pending!(stream);
4253
4254 server
4255 .mock_sync()
4256 .ok_and_run(&client, |builder| {
4257 builder.add_custom_global_account_data(json!({
4258 "content": {
4259 "media_previews": "off",
4260 "invite_avatars": "on"
4261 },
4262 "type": "m.media_preview_config"
4263 }));
4264 })
4265 .await;
4266
4267 assert_next_matches!(
4268 stream,
4269 MediaPreviewConfigEventContent {
4270 media_previews: Some(MediaPreviews::Off),
4271 invite_avatars: Some(InviteAvatars::On),
4272 ..
4273 }
4274 );
4275 assert_pending!(stream);
4276 }
4277
4278 #[async_test]
4279 async fn test_unstable_media_preview_config() {
4280 let server = MatrixMockServer::new().await;
4281 let client = server.client_builder().build().await;
4282
4283 server
4284 .mock_sync()
4285 .ok_and_run(&client, |builder| {
4286 builder.add_custom_global_account_data(json!({
4287 "content": {
4288 "media_previews": "private",
4289 "invite_avatars": "off"
4290 },
4291 "type": "io.element.msc4278.media_preview_config"
4292 }));
4293 })
4294 .await;
4295
4296 let (initial_value, stream) =
4297 client.account().observe_media_preview_config().await.unwrap();
4298
4299 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4300 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4301 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4302 pin_mut!(stream);
4303 assert_pending!(stream);
4304
4305 server
4306 .mock_sync()
4307 .ok_and_run(&client, |builder| {
4308 builder.add_custom_global_account_data(json!({
4309 "content": {
4310 "media_previews": "off",
4311 "invite_avatars": "on"
4312 },
4313 "type": "io.element.msc4278.media_preview_config"
4314 }));
4315 })
4316 .await;
4317
4318 assert_next_matches!(
4319 stream,
4320 MediaPreviewConfigEventContent {
4321 media_previews: Some(MediaPreviews::Off),
4322 invite_avatars: Some(InviteAvatars::On),
4323 ..
4324 }
4325 );
4326 assert_pending!(stream);
4327 }
4328
4329 #[async_test]
4330 async fn test_media_preview_config_not_found() {
4331 let server = MatrixMockServer::new().await;
4332 let client = server.client_builder().build().await;
4333
4334 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4335
4336 assert!(initial_value.is_none());
4337 }
4338
4339 #[async_test]
4340 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4341 // The default Matrix version we use is 1.11 or higher, so authenticated media
4342 // is supported.
4343 let server = MatrixMockServer::new().await;
4344 let client = server.client_builder().build().await;
4345
4346 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4347
4348 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4349 client.load_or_fetch_max_upload_size().await.unwrap();
4350
4351 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4352 }
4353
4354 #[async_test]
4355 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4356 // The server must advertise support for the stable feature for authenticated
4357 // media support, so we mock the `GET /versions` response.
4358 let server = MatrixMockServer::new().await;
4359 let client = server.client_builder().no_server_versions().build().await;
4360
4361 server
4362 .mock_versions()
4363 .ok_custom(
4364 &["v1.7", "v1.8", "v1.9", "v1.10"],
4365 &[("org.matrix.msc3916.stable", true)].into(),
4366 )
4367 .named("versions")
4368 .expect(1)
4369 .mount()
4370 .await;
4371
4372 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4373
4374 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4375 client.load_or_fetch_max_upload_size().await.unwrap();
4376
4377 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4378 }
4379
4380 #[async_test]
4381 async fn test_load_or_fetch_max_upload_size_no_auth() {
4382 // The server must not support Matrix 1.11 or higher for unauthenticated
4383 // media requests, so we mock the `GET /versions` response.
4384 let server = MatrixMockServer::new().await;
4385 let client = server.client_builder().no_server_versions().build().await;
4386
4387 server
4388 .mock_versions()
4389 .ok_custom(&["v1.1"], &Default::default())
4390 .named("versions")
4391 .expect(1)
4392 .mount()
4393 .await;
4394
4395 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4396
4397 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4398 client.load_or_fetch_max_upload_size().await.unwrap();
4399
4400 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4401 }
4402
4403 #[async_test]
4404 async fn test_uploading_a_too_large_media_file() {
4405 let server = MatrixMockServer::new().await;
4406 let client = server.client_builder().build().await;
4407
4408 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4409 client.load_or_fetch_max_upload_size().await.unwrap();
4410 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4411
4412 let data = vec![1, 2];
4413 let upload_request =
4414 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4415 let request = SendRequest {
4416 client: client.clone(),
4417 request: upload_request,
4418 config: None,
4419 send_progress: SharedObservable::new(TransmissionProgress::default()),
4420 };
4421 let media_request = SendMediaUploadRequest::new(request);
4422
4423 let error = media_request.await.err();
4424 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4425 assert_eq!(max, uint!(1));
4426 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4427 }
4428
4429 #[async_test]
4430 async fn test_dont_ignore_timeout_on_first_sync() {
4431 let server = MatrixMockServer::new().await;
4432 let client = server.client_builder().build().await;
4433
4434 server
4435 .mock_sync()
4436 .timeout(Some(Duration::from_secs(30)))
4437 .ok(|_| {})
4438 .mock_once()
4439 .named("sync_with_timeout")
4440 .mount()
4441 .await;
4442
4443 // Call the endpoint once to check the timeout.
4444 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4445
4446 timeout(Duration::from_secs(1), async {
4447 stream.next().await.unwrap().unwrap();
4448 })
4449 .await
4450 .unwrap();
4451 }
4452
4453 #[async_test]
4454 async fn test_ignore_timeout_on_first_sync() {
4455 let server = MatrixMockServer::new().await;
4456 let client = server.client_builder().build().await;
4457
4458 server
4459 .mock_sync()
4460 .timeout(None)
4461 .ok(|_| {})
4462 .mock_once()
4463 .named("sync_no_timeout")
4464 .mount()
4465 .await;
4466 server
4467 .mock_sync()
4468 .timeout(Some(Duration::from_secs(30)))
4469 .ok(|_| {})
4470 .mock_once()
4471 .named("sync_with_timeout")
4472 .mount()
4473 .await;
4474
4475 // Call each version of the endpoint once to check the timeouts.
4476 let mut stream = Box::pin(
4477 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4478 );
4479
4480 timeout(Duration::from_secs(1), async {
4481 stream.next().await.unwrap().unwrap();
4482 stream.next().await.unwrap().unwrap();
4483 })
4484 .await
4485 .unwrap();
4486 }
4487
4488 #[async_test]
4489 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4490 let server = MatrixMockServer::new().await;
4491 let client = server.client_builder().build().await;
4492 // This is the user ID that is inside MemberAdditional.
4493 // Note the confusing username, so we can share
4494 // GlobalAccountDataTestEvent::Direct with the invited test.
4495 let user_id = user_id!("@invited:localhost");
4496
4497 // When we receive a sync response saying "invited" is invited to a DM
4498 let f = EventFactory::new();
4499 let response = SyncResponseBuilder::default()
4500 .add_joined_room(
4501 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
4502 )
4503 .add_global_account_data(
4504 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4505 )
4506 .build_sync_response();
4507 client.base_client().receive_sync_response(response).await.unwrap();
4508
4509 // Then get_dm_room finds this room
4510 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4511 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4512 }
4513
4514 #[async_test]
4515 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4516 let server = MatrixMockServer::new().await;
4517 let client = server.client_builder().build().await;
4518 // This is the user ID that is inside MemberInvite
4519 let user_id = user_id!("@invited:localhost");
4520
4521 // When we receive a sync response saying "invited" is invited to a DM
4522 let f = EventFactory::new();
4523 let response = SyncResponseBuilder::default()
4524 .add_joined_room(
4525 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
4526 )
4527 .add_global_account_data(
4528 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4529 )
4530 .build_sync_response();
4531 client.base_client().receive_sync_response(response).await.unwrap();
4532
4533 // Then get_dm_room finds this room
4534 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4535 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4536 }
4537
4538 #[async_test]
4539 async fn test_get_dm_room_still_finds_left_room() {
4540 // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4541 // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4542
4543 let server = MatrixMockServer::new().await;
4544 let client = server.client_builder().build().await;
4545 // This is the user ID that is inside MemberAdditional.
4546 // Note the confusing username, so we can share
4547 // GlobalAccountDataTestEvent::Direct with the invited test.
4548 let user_id = user_id!("@invited:localhost");
4549
4550 // When we receive a sync response saying "invited" is invited to a DM
4551 let f = EventFactory::new();
4552 let response = SyncResponseBuilder::default()
4553 .add_joined_room(
4554 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
4555 )
4556 .add_global_account_data(
4557 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4558 )
4559 .build_sync_response();
4560 client.base_client().receive_sync_response(response).await.unwrap();
4561
4562 // Then get_dm_room finds this room
4563 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4564 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4565 }
4566
4567 #[async_test]
4568 async fn test_device_exists() {
4569 let server = MatrixMockServer::new().await;
4570 let client = server.client_builder().build().await;
4571
4572 server.mock_get_device().ok().expect(1).mount().await;
4573
4574 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4575 }
4576
4577 #[async_test]
4578 async fn test_device_exists_404() {
4579 let server = MatrixMockServer::new().await;
4580 let client = server.client_builder().build().await;
4581
4582 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4583 }
4584
4585 #[async_test]
4586 async fn test_device_exists_500() {
4587 let server = MatrixMockServer::new().await;
4588 let client = server.client_builder().build().await;
4589
4590 server.mock_get_device().error500().expect(1).mount().await;
4591
4592 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4593 }
4594
4595 #[async_test]
4596 async fn test_fetching_well_known_with_homeserver_url() {
4597 let server = MatrixMockServer::new().await;
4598 let client = server.client_builder().build().await;
4599 server.mock_well_known().ok().mount().await;
4600
4601 assert_matches!(client.fetch_client_well_known().await, Some(_));
4602 }
4603
4604 #[async_test]
4605 async fn test_fetching_well_known_with_server_name() {
4606 let server = MatrixMockServer::new().await;
4607 let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4608
4609 server.mock_well_known().ok().mount().await;
4610
4611 let client = MockClientBuilder::new(None)
4612 .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4613 .build()
4614 .await;
4615
4616 assert_matches!(client.fetch_client_well_known().await, Some(_));
4617 }
4618
4619 #[async_test]
4620 async fn test_fetching_well_known_with_domain_part_of_user_id() {
4621 let server = MatrixMockServer::new().await;
4622 server.mock_well_known().ok().mount().await;
4623
4624 let user_id =
4625 UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4626 let client = MockClientBuilder::new(None)
4627 .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4628 .build()
4629 .await;
4630
4631 assert_matches!(client.fetch_client_well_known().await, Some(_));
4632 }
4633}