matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32 DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
36 StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
37 event_cache::store::EventCacheStoreLock,
38 media::store::MediaStoreLock,
39 store::{
40 DynStateStore, RoomLoadSettings, SupportedVersionsResponse, TtlStoreValue,
41 WellKnownResponse,
42 },
43 sync::{Notification, RoomUpdates},
44 task_monitor::TaskMonitor,
45};
46use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl_cache::TtlCache};
47#[cfg(feature = "e2e-encryption")]
48use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
49use ruma::{
50 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
51 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
52 api::{
53 FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
54 auth_scheme::{AuthScheme, SendAccessToken},
55 client::{
56 account::whoami,
57 alias::{create_alias, delete_alias, get_alias},
58 authenticated_media,
59 device::{self, delete_devices, get_devices, update_device},
60 directory::{get_public_rooms, get_public_rooms_filtered},
61 discovery::{
62 discover_homeserver::{self, RtcFocusInfo},
63 get_capabilities::{self, v3::Capabilities},
64 get_supported_versions,
65 },
66 error::ErrorKind,
67 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
68 knock::knock_room,
69 media,
70 membership::{join_room_by_id, join_room_by_id_or_alias},
71 room::create_room,
72 session::login::v3::DiscoveryInfo,
73 sync::sync_events,
74 threads::get_thread_subscriptions_changes,
75 uiaa,
76 user_directory::search_users,
77 },
78 error::FromHttpResponseError,
79 path_builder::PathBuilder,
80 },
81 assign,
82 events::direct::DirectUserIdentifier,
83 push::Ruleset,
84 time::Instant,
85};
86use serde::de::DeserializeOwned;
87use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
88use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
89use url::Url;
90
91use self::{
92 caches::{CachedValue, ClientCaches},
93 futures::SendRequest,
94};
95use crate::{
96 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
97 Room, SessionTokens, TransmissionProgress,
98 authentication::{
99 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
100 oauth::OAuth,
101 },
102 client::thread_subscriptions::ThreadSubscriptionCatchup,
103 config::{RequestConfig, SyncToken},
104 deduplicating_handler::DeduplicatingHandler,
105 error::HttpResult,
106 event_cache::EventCache,
107 event_handler::{
108 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
109 EventHandlerStore, ObservableEventHandler, SyncEvent,
110 },
111 http_client::{HttpClient, SupportedPathBuilder},
112 latest_events::LatestEvents,
113 media::MediaError,
114 notification_settings::NotificationSettings,
115 room::RoomMember,
116 room_preview::RoomPreview,
117 send_queue::{SendQueue, SendQueueData},
118 sliding_sync::Version as SlidingSyncVersion,
119 sync::{RoomUpdate, SyncResponse},
120};
121#[cfg(feature = "e2e-encryption")]
122use crate::{
123 cross_process_lock::CrossProcessLock,
124 encryption::{
125 DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
126 VerificationState,
127 },
128};
129
130mod builder;
131pub(crate) mod caches;
132pub(crate) mod futures;
133pub(crate) mod thread_subscriptions;
134
135pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
136#[cfg(feature = "experimental-search")]
137use crate::search_index::SearchIndex;
138
139#[cfg(not(target_family = "wasm"))]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
141#[cfg(target_family = "wasm")]
142type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
143
144#[cfg(not(target_family = "wasm"))]
145type NotificationHandlerFn =
146 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
147#[cfg(target_family = "wasm")]
148type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
149
150/// Enum controlling if a loop running callbacks should continue or abort.
151///
152/// This is mainly used in the [`sync_with_callback`] method, the return value
153/// of the provided callback controls if the sync loop should be exited.
154///
155/// [`sync_with_callback`]: #method.sync_with_callback
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum LoopCtrl {
158 /// Continue running the loop.
159 Continue,
160 /// Break out of the loop.
161 Break,
162}
163
164/// Represents changes that can occur to a `Client`s `Session`.
165#[derive(Debug, Clone, PartialEq)]
166pub enum SessionChange {
167 /// The session's token is no longer valid.
168 UnknownToken {
169 /// Whether or not the session was soft logged out
170 soft_logout: bool,
171 },
172 /// The session's tokens have been refreshed.
173 TokensRefreshed,
174}
175
176/// Information about the server vendor obtained from the federation API.
177#[derive(Debug, Clone, PartialEq, Eq)]
178#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
179pub struct ServerVendorInfo {
180 /// The server name.
181 pub server_name: String,
182 /// The server version.
183 pub version: String,
184}
185
186/// An async/await enabled Matrix client.
187///
188/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
189#[derive(Clone)]
190pub struct Client {
191 pub(crate) inner: Arc<ClientInner>,
192}
193
194#[derive(Default)]
195pub(crate) struct ClientLocks {
196 /// Lock ensuring that only a single room may be marked as a DM at once.
197 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
198 /// explanation.
199 pub(crate) mark_as_dm_lock: Mutex<()>,
200
201 /// Lock ensuring that only a single secret store is getting opened at the
202 /// same time.
203 ///
204 /// This is important so we don't accidentally create multiple different new
205 /// default secret storage keys.
206 #[cfg(feature = "e2e-encryption")]
207 pub(crate) open_secret_store_lock: Mutex<()>,
208
209 /// Lock ensuring that we're only storing a single secret at a time.
210 ///
211 /// Take a look at the [`SecretStore::put_secret`] method for a more
212 /// detailed explanation.
213 ///
214 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
215 #[cfg(feature = "e2e-encryption")]
216 pub(crate) store_secret_lock: Mutex<()>,
217
218 /// Lock ensuring that only one method at a time might modify our backup.
219 #[cfg(feature = "e2e-encryption")]
220 pub(crate) backup_modify_lock: Mutex<()>,
221
222 /// Lock ensuring that we're going to attempt to upload backups for a single
223 /// requester.
224 #[cfg(feature = "e2e-encryption")]
225 pub(crate) backup_upload_lock: Mutex<()>,
226
227 /// Handler making sure we only have one group session sharing request in
228 /// flight per room.
229 #[cfg(feature = "e2e-encryption")]
230 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
231
232 /// Lock making sure we're only doing one key claim request at a time.
233 #[cfg(feature = "e2e-encryption")]
234 pub(crate) key_claim_lock: Mutex<()>,
235
236 /// Handler to ensure that only one members request is running at a time,
237 /// given a room.
238 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
239
240 /// Handler to ensure that only one encryption state request is running at a
241 /// time, given a room.
242 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
243
244 /// Deduplicating handler for sending read receipts. The string is an
245 /// internal implementation detail, see [`Self::send_single_receipt`].
246 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
247
248 #[cfg(feature = "e2e-encryption")]
249 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
250
251 /// Latest "generation" of data known by the crypto store.
252 ///
253 /// This is a counter that only increments, set in the database (and can
254 /// wrap). It's incremented whenever some process acquires a lock for the
255 /// first time. *This assumes the crypto store lock is being held, to
256 /// avoid data races on writing to this value in the store*.
257 ///
258 /// The current process will maintain this value in local memory and in the
259 /// DB over time. Observing a different value than the one read in
260 /// memory, when reading from the store indicates that somebody else has
261 /// written into the database under our feet.
262 ///
263 /// TODO: this should live in the `OlmMachine`, since it's information
264 /// related to the lock. As of today (2023-07-28), we blow up the entire
265 /// olm machine when there's a generation mismatch. So storing the
266 /// generation in the olm machine would make the client think there's
267 /// *always* a mismatch, and that's why we need to store the generation
268 /// outside the `OlmMachine`.
269 #[cfg(feature = "e2e-encryption")]
270 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
271}
272
273pub(crate) struct ClientInner {
274 /// All the data related to authentication and authorization.
275 pub(crate) auth_ctx: Arc<AuthCtx>,
276
277 /// The URL of the server.
278 ///
279 /// Not to be confused with the `Self::homeserver`. `server` is usually
280 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
281 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
282 /// homeserver (at the time of writing — 2024-08-28).
283 ///
284 /// This value is optional depending on how the `Client` has been built.
285 /// If it's been built from a homeserver URL directly, we don't know the
286 /// server. However, if the `Client` has been built from a server URL or
287 /// name, then the homeserver has been discovered, and we know both.
288 server: Option<Url>,
289
290 /// The URL of the homeserver to connect to.
291 ///
292 /// This is the URL for the client-server Matrix API.
293 homeserver: StdRwLock<Url>,
294
295 /// The sliding sync version.
296 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
297
298 /// The underlying HTTP client.
299 pub(crate) http_client: HttpClient,
300
301 /// User session data.
302 pub(super) base_client: BaseClient,
303
304 /// Collection of in-memory caches for the [`Client`].
305 pub(crate) caches: ClientCaches,
306
307 /// Collection of locks individual client methods might want to use, either
308 /// to ensure that only a single call to a method happens at once or to
309 /// deduplicate multiple calls to a method.
310 pub(crate) locks: ClientLocks,
311
312 /// The cross-process lock configuration.
313 ///
314 /// The SDK provides cross-process store locks (see
315 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
316 /// [`CrossProcessLockConfig::MultiProcess`] is used.
317 ///
318 /// If multiple `Client`s are running in different processes, this
319 /// value MUST be different for each `Client`.
320 cross_process_lock_config: CrossProcessLockConfig,
321
322 /// A mapping of the times at which the current user sent typing notices,
323 /// keyed by room.
324 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
325
326 /// Event handlers. See `add_event_handler`.
327 pub(crate) event_handlers: EventHandlerStore,
328
329 /// Notification handlers. See `register_notification_handler`.
330 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
331
332 /// The sender-side of channels used to receive room updates.
333 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
334
335 /// The sender-side of a channel used to observe all the room updates of a
336 /// sync response.
337 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
338
339 /// Whether the client should update its homeserver URL with the discovery
340 /// information present in the login response.
341 respect_login_well_known: bool,
342
343 /// An event that can be listened on to wait for a successful sync. The
344 /// event will only be fired if a sync loop is running. Can be used for
345 /// synchronization, e.g. if we send out a request to create a room, we can
346 /// wait for the sync to get the data to fetch a room object from the state
347 /// store.
348 pub(crate) sync_beat: event_listener::Event,
349
350 /// A central cache for events, inactive first.
351 ///
352 /// It becomes active when [`EventCache::subscribe`] is called.
353 pub(crate) event_cache: OnceCell<EventCache>,
354
355 /// End-to-end encryption related state.
356 #[cfg(feature = "e2e-encryption")]
357 pub(crate) e2ee: EncryptionData,
358
359 /// The verification state of our own device.
360 #[cfg(feature = "e2e-encryption")]
361 pub(crate) verification_state: SharedObservable<VerificationState>,
362
363 /// Whether to enable the experimental support for sending and receiving
364 /// encrypted room history on invite, per [MSC4268].
365 ///
366 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
367 #[cfg(feature = "e2e-encryption")]
368 pub(crate) enable_share_history_on_invite: bool,
369
370 /// Data related to the [`SendQueue`].
371 ///
372 /// [`SendQueue`]: crate::send_queue::SendQueue
373 pub(crate) send_queue_data: Arc<SendQueueData>,
374
375 /// The `max_upload_size` value of the homeserver, it contains the max
376 /// request size you can send.
377 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
378
379 /// The entry point to get the [`LatestEvent`] of rooms and threads.
380 ///
381 /// [`LatestEvent`]: crate::latest_event::LatestEvent
382 latest_events: OnceCell<LatestEvents>,
383
384 /// Service handling the catching up of thread subscriptions in the
385 /// background.
386 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
387
388 #[cfg(feature = "experimental-search")]
389 /// Handler for [`RoomIndex`]'s of each room
390 search_index: SearchIndex,
391
392 /// A monitor for background tasks spawned by the client.
393 pub(crate) task_monitor: TaskMonitor,
394
395 /// A sender to notify subscribers about duplicate key upload errors
396 /// triggered by requests to /keys/upload.
397 #[cfg(feature = "e2e-encryption")]
398 pub(crate) duplicate_key_upload_error_sender:
399 broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
400}
401
402impl ClientInner {
403 /// Create a new `ClientInner`.
404 ///
405 /// All the fields passed as parameters here are those that must be cloned
406 /// upon instantiation of a sub-client, e.g. a client specialized for
407 /// notifications.
408 #[allow(clippy::too_many_arguments)]
409 async fn new(
410 auth_ctx: Arc<AuthCtx>,
411 server: Option<Url>,
412 homeserver: Url,
413 sliding_sync_version: SlidingSyncVersion,
414 http_client: HttpClient,
415 base_client: BaseClient,
416 supported_versions: CachedValue<SupportedVersions>,
417 well_known: CachedValue<Option<WellKnownResponse>>,
418 respect_login_well_known: bool,
419 event_cache: OnceCell<EventCache>,
420 send_queue: Arc<SendQueueData>,
421 latest_events: OnceCell<LatestEvents>,
422 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
423 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
424 cross_process_lock_config: CrossProcessLockConfig,
425 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
426 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
427 ) -> Arc<Self> {
428 let caches = ClientCaches {
429 supported_versions: supported_versions.into(),
430 well_known: well_known.into(),
431 server_metadata: Mutex::new(TtlCache::new()),
432 };
433
434 let client = Self {
435 server,
436 homeserver: StdRwLock::new(homeserver),
437 auth_ctx,
438 sliding_sync_version: StdRwLock::new(sliding_sync_version),
439 http_client,
440 base_client,
441 caches,
442 locks: Default::default(),
443 cross_process_lock_config,
444 typing_notice_times: Default::default(),
445 event_handlers: Default::default(),
446 notification_handlers: Default::default(),
447 room_update_channels: Default::default(),
448 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
449 // ballast for all observers to catch up.
450 room_updates_sender: broadcast::Sender::new(32),
451 respect_login_well_known,
452 sync_beat: event_listener::Event::new(),
453 event_cache,
454 send_queue_data: send_queue,
455 latest_events,
456 #[cfg(feature = "e2e-encryption")]
457 e2ee: EncryptionData::new(encryption_settings),
458 #[cfg(feature = "e2e-encryption")]
459 verification_state: SharedObservable::new(VerificationState::Unknown),
460 #[cfg(feature = "e2e-encryption")]
461 enable_share_history_on_invite,
462 server_max_upload_size: Mutex::new(OnceCell::new()),
463 #[cfg(feature = "experimental-search")]
464 search_index: search_index_handler,
465 thread_subscription_catchup,
466 task_monitor: TaskMonitor::new(),
467 #[cfg(feature = "e2e-encryption")]
468 duplicate_key_upload_error_sender: broadcast::channel(1).0,
469 };
470
471 #[allow(clippy::let_and_return)]
472 let client = Arc::new(client);
473
474 #[cfg(feature = "e2e-encryption")]
475 client.e2ee.initialize_tasks(&client);
476
477 let _ = client
478 .event_cache
479 .get_or_init(|| async {
480 EventCache::new(&client, client.base_client.event_cache_store().clone())
481 })
482 .await;
483
484 let _ = client
485 .thread_subscription_catchup
486 .get_or_init(|| async {
487 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
488 })
489 .await;
490
491 client
492 }
493}
494
495#[cfg(not(tarpaulin_include))]
496impl Debug for Client {
497 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
498 write!(fmt, "Client")
499 }
500}
501
502impl Client {
503 /// Create a new [`Client`] that will use the given homeserver.
504 ///
505 /// # Arguments
506 ///
507 /// * `homeserver_url` - The homeserver that the client should connect to.
508 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
509 Self::builder().homeserver_url(homeserver_url).build().await
510 }
511
512 /// Returns a subscriber that publishes an event every time the ignore user
513 /// list changes.
514 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
515 self.inner.base_client.subscribe_to_ignore_user_list_changes()
516 }
517
518 /// Create a new [`ClientBuilder`].
519 pub fn builder() -> ClientBuilder {
520 ClientBuilder::new()
521 }
522
523 pub(crate) fn base_client(&self) -> &BaseClient {
524 &self.inner.base_client
525 }
526
527 /// The underlying HTTP client.
528 pub fn http_client(&self) -> &reqwest::Client {
529 &self.inner.http_client.inner
530 }
531
532 pub(crate) fn locks(&self) -> &ClientLocks {
533 &self.inner.locks
534 }
535
536 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
537 &self.inner.auth_ctx
538 }
539
540 /// The cross-process store lock configuration used by this [`Client`].
541 ///
542 /// The SDK provides cross-process store locks (see
543 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
544 /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
545 /// the value used for all cross-process store locks used by this
546 /// `Client`.
547 pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
548 &self.inner.cross_process_lock_config
549 }
550
551 /// Change the homeserver URL used by this client.
552 ///
553 /// # Arguments
554 ///
555 /// * `homeserver_url` - The new URL to use.
556 fn set_homeserver(&self, homeserver_url: Url) {
557 *self.inner.homeserver.write().unwrap() = homeserver_url;
558 }
559
560 /// Change to a different homeserver and re-resolve well-known.
561 #[cfg(feature = "e2e-encryption")]
562 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
563 &self,
564 homeserver_url: Url,
565 ) -> Result<()> {
566 self.set_homeserver(homeserver_url);
567 self.reset_well_known().await?;
568 if let Some(well_known) = self.load_or_fetch_well_known().await? {
569 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
570 }
571 Ok(())
572 }
573
574 /// Get the capabilities of the homeserver.
575 ///
576 /// This method should be used to check what features are supported by the
577 /// homeserver.
578 ///
579 /// # Examples
580 ///
581 /// ```no_run
582 /// # use matrix_sdk::Client;
583 /// # use url::Url;
584 /// # async {
585 /// # let homeserver = Url::parse("http://example.com")?;
586 /// let client = Client::new(homeserver).await?;
587 ///
588 /// let capabilities = client.get_capabilities().await?;
589 ///
590 /// if capabilities.change_password.enabled {
591 /// // Change password
592 /// }
593 /// # anyhow::Ok(()) };
594 /// ```
595 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
596 let res = self.send(get_capabilities::v3::Request::new()).await?;
597 Ok(res.capabilities)
598 }
599
600 /// Get the server vendor information from the federation API.
601 ///
602 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
603 /// both the server's software name and version.
604 ///
605 /// # Examples
606 ///
607 /// ```no_run
608 /// # use matrix_sdk::Client;
609 /// # use url::Url;
610 /// # async {
611 /// # let homeserver = Url::parse("http://example.com")?;
612 /// let client = Client::new(homeserver).await?;
613 ///
614 /// let server_info = client.server_vendor_info(None).await?;
615 /// println!(
616 /// "Server: {}, Version: {}",
617 /// server_info.server_name, server_info.version
618 /// );
619 /// # anyhow::Ok(()) };
620 /// ```
621 #[cfg(feature = "federation-api")]
622 pub async fn server_vendor_info(
623 &self,
624 request_config: Option<RequestConfig>,
625 ) -> HttpResult<ServerVendorInfo> {
626 use ruma::api::federation::discovery::get_server_version;
627
628 let res = self
629 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
630 .await?;
631
632 // Extract server info, using defaults if fields are missing.
633 let server = res.server.unwrap_or_default();
634 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
635 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
636
637 Ok(ServerVendorInfo { server_name: server_name_str, version })
638 }
639
640 /// Get a copy of the default request config.
641 ///
642 /// The default request config is what's used when sending requests if no
643 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
644 /// function with such a parameter.
645 ///
646 /// If the default request config was not customized through
647 /// [`ClientBuilder`] when creating this `Client`, the returned value will
648 /// be equivalent to [`RequestConfig::default()`].
649 pub fn request_config(&self) -> RequestConfig {
650 self.inner.http_client.request_config
651 }
652
653 /// Check whether the client has been activated.
654 ///
655 /// A client is considered active when:
656 ///
657 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
658 /// is logged in,
659 /// 2. Has loaded cached data from storage,
660 /// 3. If encryption is enabled, it also initialized or restored its
661 /// `OlmMachine`.
662 pub fn is_active(&self) -> bool {
663 self.inner.base_client.is_active()
664 }
665
666 /// The server used by the client.
667 ///
668 /// See `Self::server` to learn more.
669 pub fn server(&self) -> Option<&Url> {
670 self.inner.server.as_ref()
671 }
672
673 /// The homeserver of the client.
674 pub fn homeserver(&self) -> Url {
675 self.inner.homeserver.read().unwrap().clone()
676 }
677
678 /// Get the sliding sync version.
679 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
680 self.inner.sliding_sync_version.read().unwrap().clone()
681 }
682
683 /// Override the sliding sync version.
684 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
685 let mut lock = self.inner.sliding_sync_version.write().unwrap();
686 *lock = version;
687 }
688
689 /// Get the Matrix user session meta information.
690 ///
691 /// If the client is currently logged in, this will return a
692 /// [`SessionMeta`] object which contains the user ID and device ID.
693 /// Otherwise it returns `None`.
694 pub fn session_meta(&self) -> Option<&SessionMeta> {
695 self.base_client().session_meta()
696 }
697
698 /// Returns a receiver that gets events for each room info update. To watch
699 /// for new events, use `receiver.resubscribe()`.
700 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
701 self.base_client().room_info_notable_update_receiver()
702 }
703
704 /// Performs a search for users.
705 /// The search is performed case-insensitively on user IDs and display names
706 ///
707 /// # Arguments
708 ///
709 /// * `search_term` - The search term for the search
710 /// * `limit` - The maximum number of results to return. Defaults to 10.
711 ///
712 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
713 pub async fn search_users(
714 &self,
715 search_term: &str,
716 limit: u64,
717 ) -> HttpResult<search_users::v3::Response> {
718 let mut request = search_users::v3::Request::new(search_term.to_owned());
719
720 if let Some(limit) = UInt::new(limit) {
721 request.limit = limit;
722 }
723
724 self.send(request).await
725 }
726
727 /// Get the user id of the current owner of the client.
728 pub fn user_id(&self) -> Option<&UserId> {
729 self.session_meta().map(|s| s.user_id.as_ref())
730 }
731
732 /// Get the device ID that identifies the current session.
733 pub fn device_id(&self) -> Option<&DeviceId> {
734 self.session_meta().map(|s| s.device_id.as_ref())
735 }
736
737 /// Get the current access token for this session.
738 ///
739 /// Will be `None` if the client has not been logged in.
740 pub fn access_token(&self) -> Option<String> {
741 self.auth_ctx().access_token()
742 }
743
744 /// Get the current tokens for this session.
745 ///
746 /// To be notified of changes in the session tokens, use
747 /// [`Client::subscribe_to_session_changes()`] or
748 /// [`Client::set_session_callbacks()`].
749 ///
750 /// Returns `None` if the client has not been logged in.
751 pub fn session_tokens(&self) -> Option<SessionTokens> {
752 self.auth_ctx().session_tokens()
753 }
754
755 /// Access the authentication API used to log in this client.
756 ///
757 /// Will be `None` if the client has not been logged in.
758 pub fn auth_api(&self) -> Option<AuthApi> {
759 match self.auth_ctx().auth_data.get()? {
760 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
761 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
762 }
763 }
764
765 /// Get the whole session info of this client.
766 ///
767 /// Will be `None` if the client has not been logged in.
768 ///
769 /// Can be used with [`Client::restore_session`] to restore a previously
770 /// logged-in session.
771 pub fn session(&self) -> Option<AuthSession> {
772 match self.auth_api()? {
773 AuthApi::Matrix(api) => api.session().map(Into::into),
774 AuthApi::OAuth(api) => api.full_session().map(Into::into),
775 }
776 }
777
778 /// Get a reference to the state store.
779 pub fn state_store(&self) -> &DynStateStore {
780 self.base_client().state_store()
781 }
782
783 /// Get a reference to the event cache store.
784 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
785 self.base_client().event_cache_store()
786 }
787
788 /// Get a reference to the media store.
789 pub fn media_store(&self) -> &MediaStoreLock {
790 self.base_client().media_store()
791 }
792
793 /// Access the native Matrix authentication API with this client.
794 pub fn matrix_auth(&self) -> MatrixAuth {
795 MatrixAuth::new(self.clone())
796 }
797
798 /// Get the account of the current owner of the client.
799 pub fn account(&self) -> Account {
800 Account::new(self.clone())
801 }
802
803 /// Get the encryption manager of the client.
804 #[cfg(feature = "e2e-encryption")]
805 pub fn encryption(&self) -> Encryption {
806 Encryption::new(self.clone())
807 }
808
809 /// Get the media manager of the client.
810 pub fn media(&self) -> Media {
811 Media::new(self.clone())
812 }
813
814 /// Get the pusher manager of the client.
815 pub fn pusher(&self) -> Pusher {
816 Pusher::new(self.clone())
817 }
818
819 /// Access the OAuth 2.0 API of the client.
820 pub fn oauth(&self) -> OAuth {
821 OAuth::new(self.clone())
822 }
823
824 /// Register a handler for a specific event type.
825 ///
826 /// The handler is a function or closure with one or more arguments. The
827 /// first argument is the event itself. All additional arguments are
828 /// "context" arguments: They have to implement [`EventHandlerContext`].
829 /// This trait is named that way because most of the types implementing it
830 /// give additional context about an event: The room it was in, its raw form
831 /// and other similar things. As two exceptions to this,
832 /// [`Client`] and [`EventHandlerHandle`] also implement the
833 /// `EventHandlerContext` trait so you don't have to clone your client
834 /// into the event handler manually and a handler can decide to remove
835 /// itself.
836 ///
837 /// Some context arguments are not universally applicable. A context
838 /// argument that isn't available for the given event type will result in
839 /// the event handler being skipped and an error being logged. The following
840 /// context argument types are only available for a subset of event types:
841 ///
842 /// * [`Room`] is only available for room-specific events, i.e. not for
843 /// events like global account data events or presence events.
844 ///
845 /// You can provide custom context via
846 /// [`add_event_handler_context`](Client::add_event_handler_context) and
847 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
848 /// into the event handler.
849 ///
850 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
851 ///
852 /// # Examples
853 ///
854 /// ```no_run
855 /// use matrix_sdk::{
856 /// deserialized_responses::EncryptionInfo,
857 /// event_handler::Ctx,
858 /// ruma::{
859 /// events::{
860 /// macros::EventContent,
861 /// push_rules::PushRulesEvent,
862 /// room::{
863 /// message::SyncRoomMessageEvent,
864 /// topic::SyncRoomTopicEvent,
865 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
866 /// },
867 /// },
868 /// push::Action,
869 /// Int, MilliSecondsSinceUnixEpoch,
870 /// },
871 /// Client, Room,
872 /// };
873 /// use serde::{Deserialize, Serialize};
874 ///
875 /// # async fn example(client: Client) {
876 /// client.add_event_handler(
877 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
878 /// // Common usage: Room event plus room and client.
879 /// },
880 /// );
881 /// client.add_event_handler(
882 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
883 /// async move {
884 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
885 /// // unencrypted events and events that were decrypted by the SDK.
886 /// }
887 /// },
888 /// );
889 /// client.add_event_handler(
890 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
891 /// async move {
892 /// // A `Vec<Action>` parameter allows you to know which push actions
893 /// // are applicable for an event. For example, an event with
894 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
895 /// // in the timeline.
896 /// }
897 /// },
898 /// );
899 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
900 /// // You can omit any or all arguments after the first.
901 /// });
902 ///
903 /// // Registering a temporary event handler:
904 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
905 /// /* Event handler */
906 /// });
907 /// client.remove_event_handler(handle);
908 ///
909 /// // Registering custom event handler context:
910 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
911 /// struct MyContext {
912 /// number: usize,
913 /// }
914 /// client.add_event_handler_context(MyContext { number: 5 });
915 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
916 /// // Use the context
917 /// });
918 ///
919 /// // This will handle membership events in joined rooms. Invites are special, see below.
920 /// client.add_event_handler(
921 /// |ev: SyncRoomMemberEvent| async move {},
922 /// );
923 ///
924 /// // To handle state events in invited rooms (including invite membership events),
925 /// // `StrippedRoomMemberEvent` should be used.
926 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
927 /// client.add_event_handler(
928 /// |ev: StrippedRoomMemberEvent| async move {},
929 /// );
930 ///
931 /// // Custom events work exactly the same way, you just need to declare
932 /// // the content struct and use the EventContent derive macro on it.
933 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
934 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
935 /// struct TokenEventContent {
936 /// token: String,
937 /// #[serde(rename = "exp")]
938 /// expires_at: MilliSecondsSinceUnixEpoch,
939 /// }
940 ///
941 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
942 /// todo!("Display the token");
943 /// });
944 ///
945 /// // Event handler closures can also capture local variables.
946 /// // Make sure they are cheap to clone though, because they will be cloned
947 /// // every time the closure is called.
948 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
949 ///
950 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
951 /// println!("Calling the handler with identifier {data}");
952 /// });
953 /// # }
954 /// ```
955 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
956 where
957 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
958 H: EventHandler<Ev, Ctx>,
959 {
960 self.add_event_handler_impl(handler, None)
961 }
962
963 /// Register a handler for a specific room, and event type.
964 ///
965 /// This method works the same way as
966 /// [`add_event_handler`][Self::add_event_handler], except that the handler
967 /// will only be called for events in the room with the specified ID. See
968 /// that method for more details on event handler functions.
969 ///
970 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
971 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
972 /// your use case.
973 pub fn add_room_event_handler<Ev, Ctx, H>(
974 &self,
975 room_id: &RoomId,
976 handler: H,
977 ) -> EventHandlerHandle
978 where
979 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
980 H: EventHandler<Ev, Ctx>,
981 {
982 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
983 }
984
985 /// Observe a specific event type.
986 ///
987 /// `Ev` represents the kind of event that will be observed. `Ctx`
988 /// represents the context that will come with the event. It relies on the
989 /// same mechanism as [`Client::add_event_handler`]. The main difference is
990 /// that it returns an [`ObservableEventHandler`] and doesn't require a
991 /// user-defined closure. It is possible to subscribe to the
992 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
993 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
994 /// Ctx)`.
995 ///
996 /// Be careful that only the most recent value can be observed. Subscribers
997 /// are notified when a new value is sent, but there is no guarantee
998 /// that they will see all values.
999 ///
1000 /// # Example
1001 ///
1002 /// Let's see a classical usage:
1003 ///
1004 /// ```
1005 /// use futures_util::StreamExt as _;
1006 /// use matrix_sdk::{
1007 /// Client, Room,
1008 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
1009 /// };
1010 ///
1011 /// # async fn example(client: Client) -> Option<()> {
1012 /// let observer =
1013 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1014 ///
1015 /// let mut subscriber = observer.subscribe();
1016 ///
1017 /// let (event, (room, push_actions)) = subscriber.next().await?;
1018 /// # Some(())
1019 /// # }
1020 /// ```
1021 ///
1022 /// Now let's see how to get several contexts that can be useful for you:
1023 ///
1024 /// ```
1025 /// use matrix_sdk::{
1026 /// Client, Room,
1027 /// deserialized_responses::EncryptionInfo,
1028 /// ruma::{
1029 /// events::room::{
1030 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1031 /// },
1032 /// push::Action,
1033 /// },
1034 /// };
1035 ///
1036 /// # async fn example(client: Client) {
1037 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1038 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1039 ///
1040 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1041 /// // to distinguish between unencrypted events and events that were decrypted
1042 /// // by the SDK.
1043 /// let _ = client
1044 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1045 /// );
1046 ///
1047 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1048 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1049 /// // should be highlighted in the timeline.
1050 /// let _ =
1051 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1052 ///
1053 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1054 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1055 /// # }
1056 /// ```
1057 ///
1058 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1059 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1060 where
1061 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1062 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1063 {
1064 self.observe_room_events_impl(None)
1065 }
1066
1067 /// Observe a specific room, and event type.
1068 ///
1069 /// This method works the same way as [`Client::observe_events`], except
1070 /// that the observability will only be applied for events in the room with
1071 /// the specified ID. See that method for more details.
1072 ///
1073 /// Be careful that only the most recent value can be observed. Subscribers
1074 /// are notified when a new value is sent, but there is no guarantee
1075 /// that they will see all values.
1076 pub fn observe_room_events<Ev, Ctx>(
1077 &self,
1078 room_id: &RoomId,
1079 ) -> ObservableEventHandler<(Ev, Ctx)>
1080 where
1081 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1082 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1083 {
1084 self.observe_room_events_impl(Some(room_id.to_owned()))
1085 }
1086
1087 /// Shared implementation for `Client::observe_events` and
1088 /// `Client::observe_room_events`.
1089 fn observe_room_events_impl<Ev, Ctx>(
1090 &self,
1091 room_id: Option<OwnedRoomId>,
1092 ) -> ObservableEventHandler<(Ev, Ctx)>
1093 where
1094 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1095 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1096 {
1097 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1098 // new value.
1099 let shared_observable = SharedObservable::new(None);
1100
1101 ObservableEventHandler::new(
1102 shared_observable.clone(),
1103 self.event_handler_drop_guard(self.add_event_handler_impl(
1104 move |event: Ev, context: Ctx| {
1105 shared_observable.set(Some((event, context)));
1106
1107 ready(())
1108 },
1109 room_id,
1110 )),
1111 )
1112 }
1113
1114 /// Remove the event handler associated with the handle.
1115 ///
1116 /// Note that you **must not** call `remove_event_handler` from the
1117 /// non-async part of an event handler, that is:
1118 ///
1119 /// ```ignore
1120 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1121 /// // ⚠ this will cause a deadlock ⚠
1122 /// client.remove_event_handler(handle);
1123 ///
1124 /// async move {
1125 /// // removing the event handler here is fine
1126 /// client.remove_event_handler(handle);
1127 /// }
1128 /// })
1129 /// ```
1130 ///
1131 /// Note also that handlers that remove themselves will still execute with
1132 /// events received in the same sync cycle.
1133 ///
1134 /// # Arguments
1135 ///
1136 /// `handle` - The [`EventHandlerHandle`] that is returned when
1137 /// registering the event handler with [`Client::add_event_handler`].
1138 ///
1139 /// # Examples
1140 ///
1141 /// ```no_run
1142 /// # use url::Url;
1143 /// # use tokio::sync::mpsc;
1144 /// #
1145 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1146 /// #
1147 /// use matrix_sdk::{
1148 /// Client, event_handler::EventHandlerHandle,
1149 /// ruma::events::room::member::SyncRoomMemberEvent,
1150 /// };
1151 /// #
1152 /// # futures_executor::block_on(async {
1153 /// # let client = matrix_sdk::Client::builder()
1154 /// # .homeserver_url(homeserver)
1155 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1156 /// # .build()
1157 /// # .await
1158 /// # .unwrap();
1159 ///
1160 /// client.add_event_handler(
1161 /// |ev: SyncRoomMemberEvent,
1162 /// client: Client,
1163 /// handle: EventHandlerHandle| async move {
1164 /// // Common usage: Check arriving Event is the expected one
1165 /// println!("Expected RoomMemberEvent received!");
1166 /// client.remove_event_handler(handle);
1167 /// },
1168 /// );
1169 /// # });
1170 /// ```
1171 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1172 self.inner.event_handlers.remove(handle);
1173 }
1174
1175 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1176 /// the given handle.
1177 ///
1178 /// When the returned value is dropped, the event handler will be removed.
1179 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1180 EventHandlerDropGuard::new(handle, self.clone())
1181 }
1182
1183 /// Add an arbitrary value for use as event handler context.
1184 ///
1185 /// The value can be obtained in an event handler by adding an argument of
1186 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1187 ///
1188 /// If a value of the same type has been added before, it will be
1189 /// overwritten.
1190 ///
1191 /// # Examples
1192 ///
1193 /// ```no_run
1194 /// use matrix_sdk::{
1195 /// Room, event_handler::Ctx,
1196 /// ruma::events::room::message::SyncRoomMessageEvent,
1197 /// };
1198 /// # #[derive(Clone)]
1199 /// # struct SomeType;
1200 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1201 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1202 /// # futures_executor::block_on(async {
1203 /// # let client = matrix_sdk::Client::builder()
1204 /// # .homeserver_url(homeserver)
1205 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1206 /// # .build()
1207 /// # .await
1208 /// # .unwrap();
1209 ///
1210 /// // Handle used to send messages to the UI part of the app
1211 /// let my_gui_handle: SomeType = obtain_gui_handle();
1212 ///
1213 /// client.add_event_handler_context(my_gui_handle.clone());
1214 /// client.add_event_handler(
1215 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1216 /// async move {
1217 /// // gui_handle.send(DisplayMessage { message: ev });
1218 /// }
1219 /// },
1220 /// );
1221 /// # });
1222 /// ```
1223 pub fn add_event_handler_context<T>(&self, ctx: T)
1224 where
1225 T: Clone + Send + Sync + 'static,
1226 {
1227 self.inner.event_handlers.add_context(ctx);
1228 }
1229
1230 /// Register a handler for a notification.
1231 ///
1232 /// Similar to [`Client::add_event_handler`], but only allows functions
1233 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1234 /// [`Client`] for now.
1235 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1236 where
1237 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1238 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1239 {
1240 self.inner.notification_handlers.write().await.push(Box::new(
1241 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1242 ));
1243
1244 self
1245 }
1246
1247 /// Subscribe to all updates for the room with the given ID.
1248 ///
1249 /// The returned receiver will receive a new message for each sync response
1250 /// that contains updates for that room.
1251 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1252 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1253 btree_map::Entry::Vacant(entry) => {
1254 let (tx, rx) = broadcast::channel(8);
1255 entry.insert(tx);
1256 rx
1257 }
1258 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1259 }
1260 }
1261
1262 /// Subscribe to all updates to all rooms, whenever any has been received in
1263 /// a sync response.
1264 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1265 self.inner.room_updates_sender.subscribe()
1266 }
1267
1268 pub(crate) async fn notification_handlers(
1269 &self,
1270 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1271 self.inner.notification_handlers.read().await
1272 }
1273
1274 /// Get all the rooms the client knows about.
1275 ///
1276 /// This will return the list of joined, invited, and left rooms.
1277 pub fn rooms(&self) -> Vec<Room> {
1278 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1279 }
1280
1281 /// Get all the rooms the client knows about, filtered by room state.
1282 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1283 self.base_client()
1284 .rooms_filtered(filter)
1285 .into_iter()
1286 .map(|room| Room::new(self.clone(), room))
1287 .collect()
1288 }
1289
1290 /// Get a stream of all the rooms, in addition to the existing rooms.
1291 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1292 let (rooms, stream) = self.base_client().rooms_stream();
1293
1294 let map_room = |room| Room::new(self.clone(), room);
1295
1296 (
1297 rooms.into_iter().map(map_room).collect(),
1298 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1299 )
1300 }
1301
1302 /// Returns the joined rooms this client knows about.
1303 pub fn joined_rooms(&self) -> Vec<Room> {
1304 self.rooms_filtered(RoomStateFilter::JOINED)
1305 }
1306
1307 /// Returns the invited rooms this client knows about.
1308 pub fn invited_rooms(&self) -> Vec<Room> {
1309 self.rooms_filtered(RoomStateFilter::INVITED)
1310 }
1311
1312 /// Returns the left rooms this client knows about.
1313 pub fn left_rooms(&self) -> Vec<Room> {
1314 self.rooms_filtered(RoomStateFilter::LEFT)
1315 }
1316
1317 /// Returns the joined space rooms this client knows about.
1318 pub fn joined_space_rooms(&self) -> Vec<Room> {
1319 self.base_client()
1320 .rooms_filtered(RoomStateFilter::JOINED)
1321 .into_iter()
1322 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1323 .collect()
1324 }
1325
1326 /// Get a room with the given room id.
1327 ///
1328 /// # Arguments
1329 ///
1330 /// `room_id` - The unique id of the room that should be fetched.
1331 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1332 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1333 }
1334
1335 /// Gets the preview of a room, whether the current user has joined it or
1336 /// not.
1337 pub async fn get_room_preview(
1338 &self,
1339 room_or_alias_id: &RoomOrAliasId,
1340 via: Vec<OwnedServerName>,
1341 ) -> Result<RoomPreview> {
1342 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1343 Ok(room_id) => room_id.to_owned(),
1344 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1345 };
1346
1347 if let Some(room) = self.get_room(&room_id) {
1348 // The cached data can only be trusted if the room state is joined or
1349 // banned: for invite and knock rooms, no updates will be received
1350 // for the rooms after the invite/knock action took place so we may
1351 // have very out to date data for important fields such as
1352 // `join_rule`. For left rooms, the homeserver should return the latest info.
1353 match room.state() {
1354 RoomState::Joined | RoomState::Banned => {
1355 return Ok(RoomPreview::from_known_room(&room).await);
1356 }
1357 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1358 }
1359 }
1360
1361 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1362 }
1363
1364 /// Resolve a room alias to a room id and a list of servers which know
1365 /// about it.
1366 ///
1367 /// # Arguments
1368 ///
1369 /// `room_alias` - The room alias to be resolved.
1370 pub async fn resolve_room_alias(
1371 &self,
1372 room_alias: &RoomAliasId,
1373 ) -> HttpResult<get_alias::v3::Response> {
1374 let request = get_alias::v3::Request::new(room_alias.to_owned());
1375 self.send(request).await
1376 }
1377
1378 /// Checks if a room alias is not in use yet.
1379 ///
1380 /// Returns:
1381 /// - `Ok(true)` if the room alias is available.
1382 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1383 /// status code).
1384 /// - An `Err` otherwise.
1385 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1386 match self.resolve_room_alias(alias).await {
1387 // The room alias was resolved, so it's already in use.
1388 Ok(_) => Ok(false),
1389 Err(error) => {
1390 match error.client_api_error_kind() {
1391 // The room alias wasn't found, so it's available.
1392 Some(ErrorKind::NotFound) => Ok(true),
1393 _ => Err(error),
1394 }
1395 }
1396 }
1397 }
1398
1399 /// Adds a new room alias associated with a room to the room directory.
1400 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1401 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1402 self.send(request).await?;
1403 Ok(())
1404 }
1405
1406 /// Removes a room alias from the room directory.
1407 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1408 let request = delete_alias::v3::Request::new(alias.to_owned());
1409 self.send(request).await?;
1410 Ok(())
1411 }
1412
1413 /// Update the homeserver from the login response well-known if needed.
1414 ///
1415 /// # Arguments
1416 ///
1417 /// * `login_well_known` - The `well_known` field from a successful login
1418 /// response.
1419 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1420 if self.inner.respect_login_well_known
1421 && let Some(well_known) = login_well_known
1422 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1423 {
1424 self.set_homeserver(homeserver);
1425 }
1426 }
1427
1428 /// Similar to [`Client::restore_session_with`], with
1429 /// [`RoomLoadSettings::default()`].
1430 ///
1431 /// # Panics
1432 ///
1433 /// Panics if a session was already restored or logged in.
1434 #[instrument(skip_all)]
1435 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1436 self.restore_session_with(session, RoomLoadSettings::default()).await
1437 }
1438
1439 /// Restore a session previously logged-in using one of the available
1440 /// authentication APIs. The number of rooms to restore is controlled by
1441 /// [`RoomLoadSettings`].
1442 ///
1443 /// See the documentation of the corresponding authentication API's
1444 /// `restore_session` method for more information.
1445 ///
1446 /// # Panics
1447 ///
1448 /// Panics if a session was already restored or logged in.
1449 #[instrument(skip_all)]
1450 pub async fn restore_session_with(
1451 &self,
1452 session: impl Into<AuthSession>,
1453 room_load_settings: RoomLoadSettings,
1454 ) -> Result<()> {
1455 let session = session.into();
1456 match session {
1457 AuthSession::Matrix(session) => {
1458 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1459 }
1460 AuthSession::OAuth(session) => {
1461 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1462 }
1463 }
1464 }
1465
1466 /// Refresh the access token using the authentication API used to log into
1467 /// this session.
1468 ///
1469 /// See the documentation of the authentication API's `refresh_access_token`
1470 /// method for more information.
1471 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1472 let Some(auth_api) = self.auth_api() else {
1473 return Err(RefreshTokenError::RefreshTokenRequired);
1474 };
1475
1476 match auth_api {
1477 AuthApi::Matrix(api) => {
1478 trace!("Token refresh: Using the homeserver.");
1479 Box::pin(api.refresh_access_token()).await?;
1480 }
1481 AuthApi::OAuth(api) => {
1482 trace!("Token refresh: Using OAuth 2.0.");
1483 Box::pin(api.refresh_access_token()).await?;
1484 }
1485 }
1486
1487 Ok(())
1488 }
1489
1490 /// Log out the current session using the proper authentication API.
1491 ///
1492 /// # Errors
1493 ///
1494 /// Returns an error if the session is not authenticated or if an error
1495 /// occurred while making the request to the server.
1496 pub async fn logout(&self) -> Result<(), Error> {
1497 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1498 match auth_api {
1499 AuthApi::Matrix(matrix_auth) => {
1500 matrix_auth.logout().await?;
1501 Ok(())
1502 }
1503 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1504 }
1505 }
1506
1507 /// Get or upload a sync filter.
1508 ///
1509 /// This method will either get a filter ID from the store or upload the
1510 /// filter definition to the homeserver and return the new filter ID.
1511 ///
1512 /// # Arguments
1513 ///
1514 /// * `filter_name` - The unique name of the filter, this name will be used
1515 /// locally to store and identify the filter ID returned by the server.
1516 ///
1517 /// * `definition` - The filter definition that should be uploaded to the
1518 /// server if no filter ID can be found in the store.
1519 ///
1520 /// # Examples
1521 ///
1522 /// ```no_run
1523 /// # use matrix_sdk::{
1524 /// # Client, config::SyncSettings,
1525 /// # ruma::api::client::{
1526 /// # filter::{
1527 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1528 /// # },
1529 /// # sync::sync_events::v3::Filter,
1530 /// # }
1531 /// # };
1532 /// # use url::Url;
1533 /// # async {
1534 /// # let homeserver = Url::parse("http://example.com").unwrap();
1535 /// # let client = Client::new(homeserver).await.unwrap();
1536 /// let mut filter = FilterDefinition::default();
1537 ///
1538 /// // Let's enable member lazy loading.
1539 /// filter.room.state.lazy_load_options =
1540 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1541 ///
1542 /// let filter_id = client
1543 /// .get_or_upload_filter("sync", filter)
1544 /// .await
1545 /// .unwrap();
1546 ///
1547 /// let sync_settings = SyncSettings::new()
1548 /// .filter(Filter::FilterId(filter_id));
1549 ///
1550 /// let response = client.sync_once(sync_settings).await.unwrap();
1551 /// # };
1552 #[instrument(skip(self, definition))]
1553 pub async fn get_or_upload_filter(
1554 &self,
1555 filter_name: &str,
1556 definition: FilterDefinition,
1557 ) -> Result<String> {
1558 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1559 debug!("Found filter locally");
1560 Ok(filter)
1561 } else {
1562 debug!("Didn't find filter locally");
1563 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1564 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1565 let response = self.send(request).await?;
1566
1567 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1568
1569 Ok(response.filter_id)
1570 }
1571 }
1572
1573 /// Prepare to join a room by ID, by getting the current details about it
1574 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1575 let room = self.get_room(room_id)?;
1576
1577 let inviter = match room.invite_details().await {
1578 Ok(details) => details.inviter,
1579 Err(Error::WrongRoomState(_)) => None,
1580 Err(e) => {
1581 warn!("Error fetching invite details for room: {e:?}");
1582 None
1583 }
1584 };
1585
1586 Some(PreJoinRoomInfo { inviter })
1587 }
1588
1589 /// Finish joining a room.
1590 ///
1591 /// If the room was an invite that should be marked as a DM, will include it
1592 /// in the DM event after creating the joined room.
1593 ///
1594 /// If encrypted history sharing is enabled, will check to see if we have a
1595 /// key bundle, and import it if so.
1596 ///
1597 /// # Arguments
1598 ///
1599 /// * `room_id` - The `RoomId` of the room that was joined.
1600 /// * `pre_join_room_info` - Information about the room before we joined.
1601 async fn finish_join_room(
1602 &self,
1603 room_id: &RoomId,
1604 pre_join_room_info: Option<PreJoinRoomInfo>,
1605 ) -> Result<Room> {
1606 info!(?room_id, ?pre_join_room_info, "Completing room join");
1607 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1608 room.state() == RoomState::Invited
1609 && room.is_direct().await.unwrap_or_else(|e| {
1610 warn!(%room_id, "is_direct() failed: {e}");
1611 false
1612 })
1613 } else {
1614 false
1615 };
1616
1617 let base_room = self
1618 .base_client()
1619 .room_joined(
1620 room_id,
1621 pre_join_room_info
1622 .as_ref()
1623 .and_then(|info| info.inviter.as_ref())
1624 .map(|i| i.user_id().to_owned()),
1625 )
1626 .await?;
1627 let room = Room::new(self.clone(), base_room);
1628
1629 if mark_as_dm {
1630 room.set_is_direct(true).await?;
1631 }
1632
1633 // If we joined following an invite, check if we had previously received a key
1634 // bundle from the inviter, and import it if so.
1635 //
1636 // It's important that we only do this once `BaseClient::room_joined` has
1637 // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1638 // race.
1639 #[cfg(feature = "e2e-encryption")]
1640 if self.inner.enable_share_history_on_invite
1641 && let Some(inviter) =
1642 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1643 {
1644 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1645 .await?;
1646 }
1647
1648 // Suppress "unused variable" and "unused field" lints
1649 #[cfg(not(feature = "e2e-encryption"))]
1650 let _ = pre_join_room_info.map(|i| i.inviter);
1651
1652 Ok(room)
1653 }
1654
1655 /// Join a room by `RoomId`.
1656 ///
1657 /// Returns the `Room` in the joined state.
1658 ///
1659 /// # Arguments
1660 ///
1661 /// * `room_id` - The `RoomId` of the room to be joined.
1662 #[instrument(skip(self))]
1663 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1664 // See who invited us to this room, if anyone. Note we have to do this before
1665 // making the `/join` request, otherwise we could race against the sync.
1666 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1667
1668 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1669 let response = self.send(request).await?;
1670 self.finish_join_room(&response.room_id, pre_join_info).await
1671 }
1672
1673 /// Join a room by `RoomOrAliasId`.
1674 ///
1675 /// Returns the `Room` in the joined state.
1676 ///
1677 /// # Arguments
1678 ///
1679 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1680 /// alias looks like `#name:example.com`.
1681 /// * `server_names` - The server names to be used for resolving the alias,
1682 /// if needs be.
1683 #[instrument(skip(self))]
1684 pub async fn join_room_by_id_or_alias(
1685 &self,
1686 alias: &RoomOrAliasId,
1687 server_names: &[OwnedServerName],
1688 ) -> Result<Room> {
1689 let pre_join_info = {
1690 match alias.try_into() {
1691 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1692 Err(_) => {
1693 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1694 // responding to an invitation to the room, and therefore don't need to handle
1695 // things that happen as a result of invites.
1696 None
1697 }
1698 }
1699 };
1700 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1701 via: server_names.to_owned(),
1702 });
1703 let response = self.send(request).await?;
1704 self.finish_join_room(&response.room_id, pre_join_info).await
1705 }
1706
1707 /// Search the homeserver's directory of public rooms.
1708 ///
1709 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1710 /// a `get_public_rooms::Response`.
1711 ///
1712 /// # Arguments
1713 ///
1714 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1715 ///
1716 /// * `since` - Pagination token from a previous request.
1717 ///
1718 /// * `server` - The name of the server, if `None` the requested server is
1719 /// used.
1720 ///
1721 /// # Examples
1722 /// ```no_run
1723 /// use matrix_sdk::Client;
1724 /// # use url::Url;
1725 /// # let homeserver = Url::parse("http://example.com").unwrap();
1726 /// # let limit = Some(10);
1727 /// # let since = Some("since token");
1728 /// # let server = Some("servername.com".try_into().unwrap());
1729 /// # async {
1730 /// let mut client = Client::new(homeserver).await.unwrap();
1731 ///
1732 /// client.public_rooms(limit, since, server).await;
1733 /// # };
1734 /// ```
1735 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1736 pub async fn public_rooms(
1737 &self,
1738 limit: Option<u32>,
1739 since: Option<&str>,
1740 server: Option<&ServerName>,
1741 ) -> HttpResult<get_public_rooms::v3::Response> {
1742 let limit = limit.map(UInt::from);
1743
1744 let request = assign!(get_public_rooms::v3::Request::new(), {
1745 limit,
1746 since: since.map(ToOwned::to_owned),
1747 server: server.map(ToOwned::to_owned),
1748 });
1749 self.send(request).await
1750 }
1751
1752 /// Create a room with the given parameters.
1753 ///
1754 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1755 /// created room.
1756 ///
1757 /// If you want to create a direct message with one specific user, you can
1758 /// use [`create_dm`][Self::create_dm], which is more convenient than
1759 /// assembling the [`create_room::v3::Request`] yourself.
1760 ///
1761 /// If the `is_direct` field of the request is set to `true` and at least
1762 /// one user is invited, the room will be automatically added to the direct
1763 /// rooms in the account data.
1764 ///
1765 /// # Examples
1766 ///
1767 /// ```no_run
1768 /// use matrix_sdk::{
1769 /// Client,
1770 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1771 /// };
1772 /// # use url::Url;
1773 /// #
1774 /// # async {
1775 /// # let homeserver = Url::parse("http://example.com").unwrap();
1776 /// let request = CreateRoomRequest::new();
1777 /// let client = Client::new(homeserver).await.unwrap();
1778 /// assert!(client.create_room(request).await.is_ok());
1779 /// # };
1780 /// ```
1781 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1782 let invite = request.invite.clone();
1783 let is_direct_room = request.is_direct;
1784 let response = self.send(request).await?;
1785
1786 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1787
1788 let joined_room = Room::new(self.clone(), base_room);
1789
1790 if is_direct_room
1791 && !invite.is_empty()
1792 && let Err(error) =
1793 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1794 {
1795 // FIXME: Retry in the background
1796 error!("Failed to mark room as DM: {error}");
1797 }
1798
1799 Ok(joined_room)
1800 }
1801
1802 /// Create a DM room.
1803 ///
1804 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1805 /// given user being invited, the room marked `is_direct` and both the
1806 /// creator and invitee getting the default maximum power level.
1807 ///
1808 /// If the `e2e-encryption` feature is enabled, the room will also be
1809 /// encrypted.
1810 ///
1811 /// # Arguments
1812 ///
1813 /// * `user_id` - The ID of the user to create a DM for.
1814 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1815 #[cfg(feature = "e2e-encryption")]
1816 let initial_state = vec![
1817 InitialStateEvent::with_empty_state_key(
1818 RoomEncryptionEventContent::with_recommended_defaults(),
1819 )
1820 .to_raw_any(),
1821 ];
1822
1823 #[cfg(not(feature = "e2e-encryption"))]
1824 let initial_state = vec![];
1825
1826 let request = assign!(create_room::v3::Request::new(), {
1827 invite: vec![user_id.to_owned()],
1828 is_direct: true,
1829 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1830 initial_state,
1831 });
1832
1833 self.create_room(request).await
1834 }
1835
1836 /// Get the existing DM room with the given user, if any.
1837 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1838 let rooms = self.joined_rooms();
1839
1840 // Find the room we share with the `user_id` and only with `user_id`
1841 let room = rooms.into_iter().find(|r| {
1842 let targets = r.direct_targets();
1843 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1844 });
1845
1846 trace!(?user_id, ?room, "Found DM room with user");
1847 room
1848 }
1849
1850 /// Search the homeserver's directory for public rooms with a filter.
1851 ///
1852 /// # Arguments
1853 ///
1854 /// * `room_search` - The easiest way to create this request is using the
1855 /// `get_public_rooms_filtered::Request` itself.
1856 ///
1857 /// # Examples
1858 ///
1859 /// ```no_run
1860 /// # use url::Url;
1861 /// # use matrix_sdk::Client;
1862 /// # async {
1863 /// # let homeserver = Url::parse("http://example.com")?;
1864 /// use matrix_sdk::ruma::{
1865 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1866 /// };
1867 /// # let mut client = Client::new(homeserver).await?;
1868 ///
1869 /// let mut filter = Filter::new();
1870 /// filter.generic_search_term = Some("rust".to_owned());
1871 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1872 /// request.filter = filter;
1873 ///
1874 /// let response = client.public_rooms_filtered(request).await?;
1875 ///
1876 /// for room in response.chunk {
1877 /// println!("Found room {room:?}");
1878 /// }
1879 /// # anyhow::Ok(()) };
1880 /// ```
1881 pub async fn public_rooms_filtered(
1882 &self,
1883 request: get_public_rooms_filtered::v3::Request,
1884 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1885 self.send(request).await
1886 }
1887
1888 /// Send an arbitrary request to the server, without updating client state.
1889 ///
1890 /// **Warning:** Because this method *does not* update the client state, it
1891 /// is important to make sure that you account for this yourself, and
1892 /// use wrapper methods where available. This method should *only* be
1893 /// used if a wrapper method for the endpoint you'd like to use is not
1894 /// available.
1895 ///
1896 /// # Arguments
1897 ///
1898 /// * `request` - A filled out and valid request for the endpoint to be hit
1899 ///
1900 /// * `timeout` - An optional request timeout setting, this overrides the
1901 /// default request setting if one was set.
1902 ///
1903 /// # Examples
1904 ///
1905 /// ```no_run
1906 /// # use matrix_sdk::{Client, config::SyncSettings};
1907 /// # use url::Url;
1908 /// # async {
1909 /// # let homeserver = Url::parse("http://localhost:8080")?;
1910 /// # let mut client = Client::new(homeserver).await?;
1911 /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1912 ///
1913 /// // First construct the request you want to make
1914 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1915 /// // for all available Endpoints
1916 /// let user_id = owned_user_id!("@example:localhost");
1917 /// let request = profile::get_profile::v3::Request::new(user_id);
1918 ///
1919 /// // Start the request using Client::send()
1920 /// let response = client.send(request).await?;
1921 ///
1922 /// // Check the corresponding Response struct to find out what types are
1923 /// // returned
1924 /// # anyhow::Ok(()) };
1925 /// ```
1926 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1927 where
1928 Request: OutgoingRequest + Clone + Debug,
1929 for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1930 Request::PathBuilder: SupportedPathBuilder,
1931 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1932 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1933 {
1934 SendRequest {
1935 client: self.clone(),
1936 request,
1937 config: None,
1938 send_progress: Default::default(),
1939 }
1940 }
1941
1942 pub(crate) async fn send_inner<Request>(
1943 &self,
1944 request: Request,
1945 config: Option<RequestConfig>,
1946 send_progress: SharedObservable<TransmissionProgress>,
1947 ) -> HttpResult<Request::IncomingResponse>
1948 where
1949 Request: OutgoingRequest + Debug,
1950 for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1951 Request::PathBuilder: SupportedPathBuilder,
1952 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1953 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1954 {
1955 let homeserver = self.homeserver().to_string();
1956 let access_token = self.access_token();
1957 let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1958
1959 let path_builder_input =
1960 Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1961
1962 let result = self
1963 .inner
1964 .http_client
1965 .send(
1966 request,
1967 config,
1968 homeserver,
1969 access_token.as_deref(),
1970 path_builder_input,
1971 send_progress,
1972 )
1973 .await;
1974
1975 if let Err(Some(ErrorKind::UnknownToken { .. })) =
1976 result.as_ref().map_err(HttpError::client_api_error_kind)
1977 && let Some(access_token) = &access_token
1978 {
1979 // Mark the access token as expired.
1980 self.auth_ctx().set_access_token_expired(access_token);
1981 }
1982
1983 result
1984 }
1985
1986 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1987 _ = self
1988 .inner
1989 .auth_ctx
1990 .session_change_sender
1991 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1992 }
1993
1994 /// Fetches server versions from network; no caching.
1995 pub async fn fetch_server_versions(
1996 &self,
1997 request_config: Option<RequestConfig>,
1998 ) -> HttpResult<get_supported_versions::Response> {
1999 // Since this was called by the user, try to refresh the access token if
2000 // necessary.
2001 self.fetch_server_versions_inner(false, request_config).await
2002 }
2003
2004 /// Fetches server versions from network; no caching.
2005 ///
2006 /// If the access token is expired and `failsafe` is `false`, this will
2007 /// attempt to refresh the access token, otherwise this will try to make an
2008 /// unauthenticated request instead.
2009 pub(crate) async fn fetch_server_versions_inner(
2010 &self,
2011 failsafe: bool,
2012 request_config: Option<RequestConfig>,
2013 ) -> HttpResult<get_supported_versions::Response> {
2014 if !failsafe {
2015 // `Client::send()` handles refreshing access tokens.
2016 return self
2017 .send(get_supported_versions::Request::new())
2018 .with_request_config(request_config)
2019 .await;
2020 }
2021
2022 let homeserver = self.homeserver().to_string();
2023
2024 // If we have a fresh access token, try with it first.
2025 if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2026 && self.auth_ctx().has_valid_access_token()
2027 && let Some(access_token) = self.access_token()
2028 {
2029 let result = self
2030 .inner
2031 .http_client
2032 .send(
2033 get_supported_versions::Request::new(),
2034 request_config,
2035 homeserver.clone(),
2036 Some(&access_token),
2037 (),
2038 Default::default(),
2039 )
2040 .await;
2041
2042 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2043 result.as_ref().map_err(HttpError::client_api_error_kind)
2044 {
2045 // If the access token is actually expired, mark it as expired and fallback to
2046 // the unauthenticated request below.
2047 self.auth_ctx().set_access_token_expired(&access_token);
2048 } else {
2049 // If the request succeeded or it's an other error, just stop now.
2050 return result;
2051 }
2052 }
2053
2054 // Try without authentication.
2055 self.inner
2056 .http_client
2057 .send(
2058 get_supported_versions::Request::new(),
2059 request_config,
2060 homeserver.clone(),
2061 None,
2062 (),
2063 Default::default(),
2064 )
2065 .await
2066 }
2067
2068 /// Fetches client well_known from network; no caching.
2069 ///
2070 /// 1. If the [`Client::server`] value is available, we use it to fetch the
2071 /// well-known contents.
2072 /// 2. If it's not, we try extracting the server name from the
2073 /// [`Client::user_id`] and building the server URL from it.
2074 /// 3. If we couldn't get the well-known contents with either the explicit
2075 /// server name or the implicit extracted one, we try the homeserver URL
2076 /// as a last resort.
2077 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2078 let homeserver = self.homeserver();
2079 let scheme = homeserver.scheme();
2080
2081 // Use the server name, either an explicit one or an implicit one taken from
2082 // the user id: sometimes we'll have only the homeserver url available and no
2083 // server name, but the server name can be extracted from the current user id.
2084 let server_url = self
2085 .server()
2086 .map(|server| server.to_string())
2087 // If the server name wasn't available, extract it from the user id and build a URL:
2088 // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2089 // will be the same for the public server url, lacking a better candidate.
2090 .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2091
2092 // If the server name is available, first try using it
2093 let response = if let Some(server_url) = server_url {
2094 // First try using the server name
2095 self.fetch_client_well_known_with_url(server_url).await
2096 } else {
2097 None
2098 };
2099
2100 // If we didn't get a well-known value yet, try with the homeserver url instead:
2101 if response.is_none() {
2102 // Sometimes people configure their well-known directly on the homeserver so use
2103 // this as a fallback when the server name is unknown.
2104 warn!(
2105 "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2106 );
2107 self.fetch_client_well_known_with_url(homeserver.to_string()).await
2108 } else {
2109 response
2110 }
2111 }
2112
2113 async fn fetch_client_well_known_with_url(
2114 &self,
2115 url: String,
2116 ) -> Option<discover_homeserver::Response> {
2117 let well_known = self
2118 .inner
2119 .http_client
2120 .send(
2121 discover_homeserver::Request::new(),
2122 Some(RequestConfig::short_retry()),
2123 url,
2124 None,
2125 (),
2126 Default::default(),
2127 )
2128 .await;
2129
2130 match well_known {
2131 Ok(well_known) => Some(well_known),
2132 Err(http_error) => {
2133 // It is perfectly valid to not have a well-known file.
2134 // Maybe we should check for a specific error code to be sure?
2135 warn!("Failed to fetch client well-known: {http_error}");
2136 None
2137 }
2138 }
2139 }
2140
2141 /// Load supported versions from storage, or fetch them from network and
2142 /// cache them.
2143 ///
2144 /// If `failsafe` is true, this will try to minimize side effects to avoid
2145 /// possible deadlocks.
2146 async fn load_or_fetch_supported_versions(
2147 &self,
2148 failsafe: bool,
2149 ) -> HttpResult<SupportedVersionsResponse> {
2150 match self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await {
2151 Ok(Some(stored)) => {
2152 if let Some(supported_versions) =
2153 stored.into_supported_versions().and_then(|value| value.into_data())
2154 {
2155 return Ok(supported_versions);
2156 }
2157 }
2158 Ok(None) => {
2159 // fallthrough: cache is empty
2160 }
2161 Err(err) => {
2162 warn!("error when loading cached supported versions: {err}");
2163 // fallthrough to network.
2164 }
2165 }
2166
2167 let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2168 let supported_versions = SupportedVersionsResponse {
2169 versions: server_versions.versions,
2170 unstable_features: server_versions.unstable_features,
2171 };
2172
2173 // Only attempt to cache the result in storage if the request was authenticated.
2174 if self.auth_ctx().has_valid_access_token()
2175 && let Err(err) = self
2176 .state_store()
2177 .set_kv_data(
2178 StateStoreDataKey::SupportedVersions,
2179 StateStoreDataValue::SupportedVersions(TtlStoreValue::new(
2180 supported_versions.clone(),
2181 )),
2182 )
2183 .await
2184 {
2185 warn!("error when caching supported versions: {err}");
2186 }
2187
2188 Ok(supported_versions)
2189 }
2190
2191 pub(crate) async fn get_cached_supported_versions(&self) -> Option<SupportedVersions> {
2192 let supported_versions = &self.inner.caches.supported_versions;
2193
2194 if let CachedValue::Cached(val) = &*supported_versions.read().await {
2195 Some(val.clone())
2196 } else {
2197 None
2198 }
2199 }
2200
2201 /// Get the Matrix versions and features supported by the homeserver by
2202 /// fetching them from the server or the cache.
2203 ///
2204 /// This is equivalent to calling both [`Client::server_versions()`] and
2205 /// [`Client::unstable_features()`]. To always fetch the result from the
2206 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2207 /// and then `.as_supported_versions()` on the response.
2208 ///
2209 /// # Examples
2210 ///
2211 /// ```no_run
2212 /// use ruma::api::{FeatureFlag, MatrixVersion};
2213 /// # use matrix_sdk::{Client, config::SyncSettings};
2214 /// # use url::Url;
2215 /// # async {
2216 /// # let homeserver = Url::parse("http://localhost:8080")?;
2217 /// # let mut client = Client::new(homeserver).await?;
2218 ///
2219 /// let supported = client.supported_versions().await?;
2220 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2221 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2222 ///
2223 /// let msc_x_feature = FeatureFlag::from("msc_x");
2224 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2225 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2226 /// # anyhow::Ok(()) };
2227 /// ```
2228 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2229 self.supported_versions_inner(false).await
2230 }
2231
2232 /// Get the Matrix versions and features supported by the homeserver by
2233 /// fetching them from the server or the cache.
2234 ///
2235 /// If `failsafe` is true, this will try to minimize side effects to avoid
2236 /// possible deadlocks.
2237 pub(crate) async fn supported_versions_inner(
2238 &self,
2239 failsafe: bool,
2240 ) -> HttpResult<SupportedVersions> {
2241 let cached_supported_versions = &self.inner.caches.supported_versions;
2242 if let CachedValue::Cached(val) = &*cached_supported_versions.read().await {
2243 return Ok(val.clone());
2244 }
2245
2246 let mut guarded_supported_versions = cached_supported_versions.write().await;
2247 if let CachedValue::Cached(val) = &*guarded_supported_versions {
2248 return Ok(val.clone());
2249 }
2250
2251 let supported = self.load_or_fetch_supported_versions(failsafe).await?;
2252
2253 // Fill both unstable features and server versions at once.
2254 let mut supported_versions = supported.supported_versions();
2255 if supported_versions.versions.is_empty() {
2256 supported_versions.versions = [MatrixVersion::V1_0].into();
2257 }
2258
2259 // Only cache the result if the request was authenticated.
2260 if self.auth_ctx().has_valid_access_token() {
2261 *guarded_supported_versions = CachedValue::Cached(supported_versions.clone());
2262 }
2263
2264 Ok(supported_versions)
2265 }
2266
2267 /// Get the Matrix versions and features supported by the homeserver by
2268 /// fetching them from the cache.
2269 ///
2270 /// For a version of this function that fetches the supported versions and
2271 /// features from the homeserver if the [`SupportedVersions`] aren't
2272 /// found in the cache, take a look at the [`Client::server_versions()`]
2273 /// method.
2274 ///
2275 /// # Examples
2276 ///
2277 /// ```no_run
2278 /// use ruma::api::{FeatureFlag, MatrixVersion};
2279 /// # use matrix_sdk::{Client, config::SyncSettings};
2280 /// # use url::Url;
2281 /// # async {
2282 /// # let homeserver = Url::parse("http://localhost:8080")?;
2283 /// # let mut client = Client::new(homeserver).await?;
2284 ///
2285 /// let supported =
2286 /// if let Some(supported) = client.supported_versions_cached().await? {
2287 /// supported
2288 /// } else {
2289 /// client.fetch_server_versions(None).await?.as_supported_versions()
2290 /// };
2291 ///
2292 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2293 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2294 ///
2295 /// let msc_x_feature = FeatureFlag::from("msc_x");
2296 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2297 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2298 /// # anyhow::Ok(()) };
2299 /// ```
2300 pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2301 if let Some(cached) = self.get_cached_supported_versions().await {
2302 Ok(Some(cached))
2303 } else if let Some(stored) =
2304 self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await?
2305 {
2306 if let Some(supported) =
2307 stored.into_supported_versions().and_then(|value| value.into_data())
2308 {
2309 Ok(Some(supported.supported_versions()))
2310 } else {
2311 Ok(None)
2312 }
2313 } else {
2314 Ok(None)
2315 }
2316 }
2317
2318 /// Get the Matrix versions supported by the homeserver by fetching them
2319 /// from the server or the cache.
2320 ///
2321 /// # Examples
2322 ///
2323 /// ```no_run
2324 /// use ruma::api::MatrixVersion;
2325 /// # use matrix_sdk::{Client, config::SyncSettings};
2326 /// # use url::Url;
2327 /// # async {
2328 /// # let homeserver = Url::parse("http://localhost:8080")?;
2329 /// # let mut client = Client::new(homeserver).await?;
2330 ///
2331 /// let server_versions = client.server_versions().await?;
2332 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2333 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2334 /// # anyhow::Ok(()) };
2335 /// ```
2336 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2337 Ok(self.supported_versions().await?.versions)
2338 }
2339
2340 /// Get the unstable features supported by the homeserver by fetching them
2341 /// from the server or the cache.
2342 ///
2343 /// # Examples
2344 ///
2345 /// ```no_run
2346 /// use matrix_sdk::ruma::api::FeatureFlag;
2347 /// # use matrix_sdk::{Client, config::SyncSettings};
2348 /// # use url::Url;
2349 /// # async {
2350 /// # let homeserver = Url::parse("http://localhost:8080")?;
2351 /// # let mut client = Client::new(homeserver).await?;
2352 ///
2353 /// let msc_x_feature = FeatureFlag::from("msc_x");
2354 /// let unstable_features = client.unstable_features().await?;
2355 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2356 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2357 /// # anyhow::Ok(()) };
2358 /// ```
2359 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2360 Ok(self.supported_versions().await?.features)
2361 }
2362
2363 /// Empty the supported versions and unstable features cache.
2364 ///
2365 /// Since the SDK caches the supported versions, it's possible to have a
2366 /// stale entry in the cache. This functions makes it possible to force
2367 /// reset it.
2368 pub async fn reset_supported_versions(&self) -> Result<()> {
2369 // Empty the in-memory caches.
2370 self.inner.caches.supported_versions.write().await.take();
2371
2372 // Empty the store cache.
2373 Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2374 }
2375
2376 /// Load well-known from storage, or fetch it from network and cache it.
2377 async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2378 match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
2379 Ok(Some(stored)) => {
2380 if let Some(well_known) =
2381 stored.into_well_known().and_then(|value| value.into_data())
2382 {
2383 return Ok(well_known);
2384 }
2385 }
2386 Ok(None) => {
2387 // fallthrough: cache is empty
2388 }
2389 Err(err) => {
2390 warn!("error when loading cached well-known: {err}");
2391 // fallthrough to network.
2392 }
2393 }
2394
2395 let well_known = self.fetch_client_well_known().await.map(Into::into);
2396
2397 // Attempt to cache the result in storage.
2398 if let Err(err) = self
2399 .state_store()
2400 .set_kv_data(
2401 StateStoreDataKey::WellKnown,
2402 StateStoreDataValue::WellKnown(TtlStoreValue::new(well_known.clone())),
2403 )
2404 .await
2405 {
2406 warn!("error when caching well-known: {err}");
2407 }
2408
2409 Ok(well_known)
2410 }
2411
2412 async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2413 let well_known = &self.inner.caches.well_known;
2414 if let CachedValue::Cached(val) = &*well_known.read().await {
2415 return Ok(val.clone());
2416 }
2417
2418 let mut guarded_well_known = well_known.write().await;
2419 if let CachedValue::Cached(val) = &*guarded_well_known {
2420 return Ok(val.clone());
2421 }
2422
2423 let well_known = self.load_or_fetch_well_known().await?;
2424
2425 *guarded_well_known = CachedValue::Cached(well_known.clone());
2426
2427 Ok(well_known)
2428 }
2429
2430 /// Get information about the homeserver's advertised RTC foci by fetching
2431 /// the well-known file from the server or the cache.
2432 ///
2433 /// # Examples
2434 /// ```no_run
2435 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2436 /// # use url::Url;
2437 /// # async {
2438 /// # let homeserver = Url::parse("http://localhost:8080")?;
2439 /// # let mut client = Client::new(homeserver).await?;
2440 /// let rtc_foci = client.rtc_foci().await?;
2441 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2442 /// RtcFocusInfo::LiveKit(info) => Some(info),
2443 /// _ => None,
2444 /// });
2445 /// if let Some(info) = default_livekit_focus_info {
2446 /// println!("Default LiveKit service URL: {}", info.service_url);
2447 /// }
2448 /// # anyhow::Ok(()) };
2449 /// ```
2450 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2451 let well_known = self.get_or_load_and_cache_well_known().await?;
2452
2453 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2454 }
2455
2456 /// Empty the well-known cache.
2457 ///
2458 /// Since the SDK caches the well-known, it's possible to have a stale entry
2459 /// in the cache. This functions makes it possible to force reset it.
2460 pub async fn reset_well_known(&self) -> Result<()> {
2461 // Empty the in-memory caches.
2462 self.inner.caches.well_known.write().await.take();
2463
2464 // Empty the store cache.
2465 Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2466 }
2467
2468 /// Check whether MSC 4028 is enabled on the homeserver.
2469 ///
2470 /// # Examples
2471 ///
2472 /// ```no_run
2473 /// # use matrix_sdk::{Client, config::SyncSettings};
2474 /// # use url::Url;
2475 /// # async {
2476 /// # let homeserver = Url::parse("http://localhost:8080")?;
2477 /// # let mut client = Client::new(homeserver).await?;
2478 /// let msc4028_enabled =
2479 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2480 /// # anyhow::Ok(()) };
2481 /// ```
2482 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2483 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2484 }
2485
2486 /// Get information of all our own devices.
2487 ///
2488 /// # Examples
2489 ///
2490 /// ```no_run
2491 /// # use matrix_sdk::{Client, config::SyncSettings};
2492 /// # use url::Url;
2493 /// # async {
2494 /// # let homeserver = Url::parse("http://localhost:8080")?;
2495 /// # let mut client = Client::new(homeserver).await?;
2496 /// let response = client.devices().await?;
2497 ///
2498 /// for device in response.devices {
2499 /// println!(
2500 /// "Device: {} {}",
2501 /// device.device_id,
2502 /// device.display_name.as_deref().unwrap_or("")
2503 /// );
2504 /// }
2505 /// # anyhow::Ok(()) };
2506 /// ```
2507 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2508 let request = get_devices::v3::Request::new();
2509
2510 self.send(request).await
2511 }
2512
2513 /// Delete the given devices from the server.
2514 ///
2515 /// # Arguments
2516 ///
2517 /// * `devices` - The list of devices that should be deleted from the
2518 /// server.
2519 ///
2520 /// * `auth_data` - This request requires user interactive auth, the first
2521 /// request needs to set this to `None` and will always fail with an
2522 /// `UiaaResponse`. The response will contain information for the
2523 /// interactive auth and the same request needs to be made but this time
2524 /// with some `auth_data` provided.
2525 ///
2526 /// ```no_run
2527 /// # use matrix_sdk::{
2528 /// # ruma::{api::client::uiaa, owned_device_id},
2529 /// # Client, Error, config::SyncSettings,
2530 /// # };
2531 /// # use serde_json::json;
2532 /// # use url::Url;
2533 /// # use std::collections::BTreeMap;
2534 /// # async {
2535 /// # let homeserver = Url::parse("http://localhost:8080")?;
2536 /// # let mut client = Client::new(homeserver).await?;
2537 /// let devices = &[owned_device_id!("DEVICEID")];
2538 ///
2539 /// if let Err(e) = client.delete_devices(devices, None).await {
2540 /// if let Some(info) = e.as_uiaa_response() {
2541 /// let mut password = uiaa::Password::new(
2542 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2543 /// "wordpass".to_owned(),
2544 /// );
2545 /// password.session = info.session.clone();
2546 ///
2547 /// client
2548 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2549 /// .await?;
2550 /// }
2551 /// }
2552 /// # anyhow::Ok(()) };
2553 pub async fn delete_devices(
2554 &self,
2555 devices: &[OwnedDeviceId],
2556 auth_data: Option<uiaa::AuthData>,
2557 ) -> HttpResult<delete_devices::v3::Response> {
2558 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2559 request.auth = auth_data;
2560
2561 self.send(request).await
2562 }
2563
2564 /// Change the display name of a device owned by the current user.
2565 ///
2566 /// Returns a `update_device::Response` which specifies the result
2567 /// of the operation.
2568 ///
2569 /// # Arguments
2570 ///
2571 /// * `device_id` - The ID of the device to change the display name of.
2572 /// * `display_name` - The new display name to set.
2573 pub async fn rename_device(
2574 &self,
2575 device_id: &DeviceId,
2576 display_name: &str,
2577 ) -> HttpResult<update_device::v3::Response> {
2578 let mut request = update_device::v3::Request::new(device_id.to_owned());
2579 request.display_name = Some(display_name.to_owned());
2580
2581 self.send(request).await
2582 }
2583
2584 /// Check whether a device with a specific ID exists on the server.
2585 ///
2586 /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2587 /// with 404 and the underlying error otherwise.
2588 ///
2589 /// # Arguments
2590 ///
2591 /// * `device_id` - The ID of the device to query.
2592 pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2593 let request = device::get_device::v3::Request::new(device_id);
2594 match self.send(request).await {
2595 Ok(_) => Ok(true),
2596 Err(err) => {
2597 if let Some(error) = err.as_client_api_error()
2598 && error.status_code == 404
2599 {
2600 Ok(false)
2601 } else {
2602 Err(err.into())
2603 }
2604 }
2605 }
2606 }
2607
2608 /// Synchronize the client's state with the latest state on the server.
2609 ///
2610 /// ## Syncing Events
2611 ///
2612 /// Messages or any other type of event need to be periodically fetched from
2613 /// the server, this is achieved by sending a `/sync` request to the server.
2614 ///
2615 /// The first sync is sent out without a [`token`]. The response of the
2616 /// first sync will contain a [`next_batch`] field which should then be
2617 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2618 /// don't receive the same events multiple times.
2619 ///
2620 /// ## Long Polling
2621 ///
2622 /// A sync should in the usual case always be in flight. The
2623 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2624 /// long the server will wait for new events before it will respond.
2625 /// The server will respond immediately if some new events arrive before the
2626 /// timeout has expired. If no changes arrive and the timeout expires an
2627 /// empty sync response will be sent to the client.
2628 ///
2629 /// This method of sending a request that may not receive a response
2630 /// immediately is called long polling.
2631 ///
2632 /// ## Filtering Events
2633 ///
2634 /// The number or type of messages and events that the client should receive
2635 /// from the server can be altered using a [`Filter`].
2636 ///
2637 /// Filters can be non-trivial and, since they will be sent with every sync
2638 /// request, they may take up a bunch of unnecessary bandwidth.
2639 ///
2640 /// Luckily filters can be uploaded to the server and reused using an unique
2641 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2642 /// method.
2643 ///
2644 /// # Arguments
2645 ///
2646 /// * `sync_settings` - Settings for the sync call, this allows us to set
2647 /// various options to configure the sync:
2648 /// * [`filter`] - To configure which events we receive and which get
2649 /// [filtered] by the server
2650 /// * [`timeout`] - To configure our [long polling] setup.
2651 /// * [`token`] - To tell the server which events we already received
2652 /// and where we wish to continue syncing.
2653 /// * [`full_state`] - To tell the server that we wish to receive all
2654 /// state events, regardless of our configured [`token`].
2655 /// * [`set_presence`] - To tell the server to set the presence and to
2656 /// which state.
2657 ///
2658 /// # Examples
2659 ///
2660 /// ```no_run
2661 /// # use url::Url;
2662 /// # async {
2663 /// # let homeserver = Url::parse("http://localhost:8080")?;
2664 /// # let username = "";
2665 /// # let password = "";
2666 /// use matrix_sdk::{
2667 /// Client, config::SyncSettings,
2668 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2669 /// };
2670 ///
2671 /// let client = Client::new(homeserver).await?;
2672 /// client.matrix_auth().login_username(username, password).send().await?;
2673 ///
2674 /// // Sync once so we receive the client state and old messages.
2675 /// client.sync_once(SyncSettings::default()).await?;
2676 ///
2677 /// // Register our handler so we start responding once we receive a new
2678 /// // event.
2679 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2680 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2681 /// });
2682 ///
2683 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2684 /// // from our `sync_once()` call automatically.
2685 /// client.sync(SyncSettings::default()).await;
2686 /// # anyhow::Ok(()) };
2687 /// ```
2688 ///
2689 /// [`sync`]: #method.sync
2690 /// [`SyncSettings`]: crate::config::SyncSettings
2691 /// [`token`]: crate::config::SyncSettings#method.token
2692 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2693 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2694 /// [`set_presence`]: ruma::presence::PresenceState
2695 /// [`filter`]: crate::config::SyncSettings#method.filter
2696 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2697 /// [`next_batch`]: SyncResponse#structfield.next_batch
2698 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2699 /// [long polling]: #long-polling
2700 /// [filtered]: #filtering-events
2701 #[instrument(skip(self))]
2702 pub async fn sync_once(
2703 &self,
2704 sync_settings: crate::config::SyncSettings,
2705 ) -> Result<SyncResponse> {
2706 // The sync might not return for quite a while due to the timeout.
2707 // We'll see if there's anything crypto related to send out before we
2708 // sync, i.e. if we closed our client after a sync but before the
2709 // crypto requests were sent out.
2710 //
2711 // This will mostly be a no-op.
2712 #[cfg(feature = "e2e-encryption")]
2713 if let Err(e) = self.send_outgoing_requests().await {
2714 error!(error = ?e, "Error while sending outgoing E2EE requests");
2715 }
2716
2717 let token = match sync_settings.token {
2718 SyncToken::Specific(token) => Some(token),
2719 SyncToken::NoToken => None,
2720 SyncToken::ReusePrevious => self.sync_token().await,
2721 };
2722
2723 let request = assign!(sync_events::v3::Request::new(), {
2724 filter: sync_settings.filter.map(|f| *f),
2725 since: token,
2726 full_state: sync_settings.full_state,
2727 set_presence: sync_settings.set_presence,
2728 timeout: sync_settings.timeout,
2729 use_state_after: true,
2730 });
2731 let mut request_config = self.request_config();
2732 if let Some(timeout) = sync_settings.timeout {
2733 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2734 request_config.timeout = Some(base_timeout + timeout);
2735 }
2736
2737 let response = self.send(request).with_request_config(request_config).await?;
2738 let next_batch = response.next_batch.clone();
2739 let response = self.process_sync(response).await?;
2740
2741 #[cfg(feature = "e2e-encryption")]
2742 if let Err(e) = self.send_outgoing_requests().await {
2743 error!(error = ?e, "Error while sending outgoing E2EE requests");
2744 }
2745
2746 self.inner.sync_beat.notify(usize::MAX);
2747
2748 Ok(SyncResponse::new(next_batch, response))
2749 }
2750
2751 /// Repeatedly synchronize the client state with the server.
2752 ///
2753 /// This method will only return on error, if cancellation is needed
2754 /// the method should be wrapped in a cancelable task or the
2755 /// [`Client::sync_with_callback`] method can be used or
2756 /// [`Client::sync_with_result_callback`] if you want to handle error
2757 /// cases in the loop, too.
2758 ///
2759 /// This method will internally call [`Client::sync_once`] in a loop.
2760 ///
2761 /// This method can be used with the [`Client::add_event_handler`]
2762 /// method to react to individual events. If you instead wish to handle
2763 /// events in a bulk manner the [`Client::sync_with_callback`],
2764 /// [`Client::sync_with_result_callback`] and
2765 /// [`Client::sync_stream`] methods can be used instead. Those methods
2766 /// repeatedly return the whole sync response.
2767 ///
2768 /// # Arguments
2769 ///
2770 /// * `sync_settings` - Settings for the sync call. *Note* that those
2771 /// settings will be only used for the first sync call. See the argument
2772 /// docs for [`Client::sync_once`] for more info.
2773 ///
2774 /// # Return
2775 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2776 /// up to the user of the API to check the error and decide whether the sync
2777 /// should continue or not.
2778 ///
2779 /// # Examples
2780 ///
2781 /// ```no_run
2782 /// # use url::Url;
2783 /// # async {
2784 /// # let homeserver = Url::parse("http://localhost:8080")?;
2785 /// # let username = "";
2786 /// # let password = "";
2787 /// use matrix_sdk::{
2788 /// Client, config::SyncSettings,
2789 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2790 /// };
2791 ///
2792 /// let client = Client::new(homeserver).await?;
2793 /// client.matrix_auth().login_username(&username, &password).send().await?;
2794 ///
2795 /// // Register our handler so we start responding once we receive a new
2796 /// // event.
2797 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2798 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2799 /// });
2800 ///
2801 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2802 /// // automatically.
2803 /// client.sync(SyncSettings::default()).await?;
2804 /// # anyhow::Ok(()) };
2805 /// ```
2806 ///
2807 /// [argument docs]: #method.sync_once
2808 /// [`sync_with_callback`]: #method.sync_with_callback
2809 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2810 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2811 }
2812
2813 /// Repeatedly call sync to synchronize the client state with the server.
2814 ///
2815 /// # Arguments
2816 ///
2817 /// * `sync_settings` - Settings for the sync call. *Note* that those
2818 /// settings will be only used for the first sync call. See the argument
2819 /// docs for [`Client::sync_once`] for more info.
2820 ///
2821 /// * `callback` - A callback that will be called every time a successful
2822 /// response has been fetched from the server. The callback must return a
2823 /// boolean which signalizes if the method should stop syncing. If the
2824 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2825 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2826 ///
2827 /// # Return
2828 /// The sync runs until an error occurs or the
2829 /// callback indicates that the Loop should stop. If the callback asked for
2830 /// a regular stop, the result will be `Ok(())` otherwise the
2831 /// `Err(Error)` is returned.
2832 ///
2833 /// # Examples
2834 ///
2835 /// The following example demonstrates how to sync forever while sending all
2836 /// the interesting events through a mpsc channel to another thread e.g. a
2837 /// UI thread.
2838 ///
2839 /// ```no_run
2840 /// # use std::time::Duration;
2841 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2842 /// # use url::Url;
2843 /// # async {
2844 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2845 /// # let mut client = Client::new(homeserver).await.unwrap();
2846 ///
2847 /// use tokio::sync::mpsc::channel;
2848 ///
2849 /// let (tx, rx) = channel(100);
2850 ///
2851 /// let sync_channel = &tx;
2852 /// let sync_settings = SyncSettings::new()
2853 /// .timeout(Duration::from_secs(30));
2854 ///
2855 /// client
2856 /// .sync_with_callback(sync_settings, |response| async move {
2857 /// let channel = sync_channel;
2858 /// for (room_id, room) in response.rooms.joined {
2859 /// for event in room.timeline.events {
2860 /// channel.send(event).await.unwrap();
2861 /// }
2862 /// }
2863 ///
2864 /// LoopCtrl::Continue
2865 /// })
2866 /// .await;
2867 /// };
2868 /// ```
2869 #[instrument(skip_all)]
2870 pub async fn sync_with_callback<C>(
2871 &self,
2872 sync_settings: crate::config::SyncSettings,
2873 callback: impl Fn(SyncResponse) -> C,
2874 ) -> Result<(), Error>
2875 where
2876 C: Future<Output = LoopCtrl>,
2877 {
2878 self.sync_with_result_callback(sync_settings, |result| async {
2879 Ok(callback(result?).await)
2880 })
2881 .await
2882 }
2883
2884 /// Repeatedly call sync to synchronize the client state with the server.
2885 ///
2886 /// # Arguments
2887 ///
2888 /// * `sync_settings` - Settings for the sync call. *Note* that those
2889 /// settings will be only used for the first sync call. See the argument
2890 /// docs for [`Client::sync_once`] for more info.
2891 ///
2892 /// * `callback` - A callback that will be called every time after a
2893 /// response has been received, failure or not. The callback returns a
2894 /// `Result<LoopCtrl, Error>`, too. When returning
2895 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2896 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2897 /// function returns `Ok(())`. In case the callback can't handle the
2898 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2899 /// which results in the sync ending and the `Err(Error)` being returned.
2900 ///
2901 /// # Return
2902 /// The sync runs until an error occurs that the callback can't handle or
2903 /// the callback indicates that the Loop should stop. If the callback
2904 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2905 /// `Err(Error)` is returned.
2906 ///
2907 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2908 /// this, and are handled first without sending the result to the
2909 /// callback. Only after they have exceeded is the `Result` handed to
2910 /// the callback.
2911 ///
2912 /// # Examples
2913 ///
2914 /// The following example demonstrates how to sync forever while sending all
2915 /// the interesting events through a mpsc channel to another thread e.g. a
2916 /// UI thread.
2917 ///
2918 /// ```no_run
2919 /// # use std::time::Duration;
2920 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2921 /// # use url::Url;
2922 /// # async {
2923 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2924 /// # let mut client = Client::new(homeserver).await.unwrap();
2925 /// #
2926 /// use tokio::sync::mpsc::channel;
2927 ///
2928 /// let (tx, rx) = channel(100);
2929 ///
2930 /// let sync_channel = &tx;
2931 /// let sync_settings = SyncSettings::new()
2932 /// .timeout(Duration::from_secs(30));
2933 ///
2934 /// client
2935 /// .sync_with_result_callback(sync_settings, |response| async move {
2936 /// let channel = sync_channel;
2937 /// let sync_response = response?;
2938 /// for (room_id, room) in sync_response.rooms.joined {
2939 /// for event in room.timeline.events {
2940 /// channel.send(event).await.unwrap();
2941 /// }
2942 /// }
2943 ///
2944 /// Ok(LoopCtrl::Continue)
2945 /// })
2946 /// .await;
2947 /// };
2948 /// ```
2949 #[instrument(skip(self, callback))]
2950 pub async fn sync_with_result_callback<C>(
2951 &self,
2952 sync_settings: crate::config::SyncSettings,
2953 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2954 ) -> Result<(), Error>
2955 where
2956 C: Future<Output = Result<LoopCtrl, Error>>,
2957 {
2958 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2959
2960 while let Some(result) = sync_stream.next().await {
2961 trace!("Running callback");
2962 if callback(result).await? == LoopCtrl::Break {
2963 trace!("Callback told us to stop");
2964 break;
2965 }
2966 trace!("Done running callback");
2967 }
2968
2969 Ok(())
2970 }
2971
2972 //// Repeatedly synchronize the client state with the server.
2973 ///
2974 /// This method will internally call [`Client::sync_once`] in a loop and is
2975 /// equivalent to the [`Client::sync`] method but the responses are provided
2976 /// as an async stream.
2977 ///
2978 /// # Arguments
2979 ///
2980 /// * `sync_settings` - Settings for the sync call. *Note* that those
2981 /// settings will be only used for the first sync call. See the argument
2982 /// docs for [`Client::sync_once`] for more info.
2983 ///
2984 /// # Examples
2985 ///
2986 /// ```no_run
2987 /// # use url::Url;
2988 /// # async {
2989 /// # let homeserver = Url::parse("http://localhost:8080")?;
2990 /// # let username = "";
2991 /// # let password = "";
2992 /// use futures_util::StreamExt;
2993 /// use matrix_sdk::{Client, config::SyncSettings};
2994 ///
2995 /// let client = Client::new(homeserver).await?;
2996 /// client.matrix_auth().login_username(&username, &password).send().await?;
2997 ///
2998 /// let mut sync_stream =
2999 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
3000 ///
3001 /// while let Some(Ok(response)) = sync_stream.next().await {
3002 /// for room in response.rooms.joined.values() {
3003 /// for e in &room.timeline.events {
3004 /// if let Ok(event) = e.raw().deserialize() {
3005 /// println!("Received event {:?}", event);
3006 /// }
3007 /// }
3008 /// }
3009 /// }
3010 ///
3011 /// # anyhow::Ok(()) };
3012 /// ```
3013 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3014 #[instrument(skip(self))]
3015 pub async fn sync_stream(
3016 &self,
3017 mut sync_settings: crate::config::SyncSettings,
3018 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3019 let mut is_first_sync = true;
3020 let mut timeout = None;
3021 let mut last_sync_time: Option<Instant> = None;
3022
3023 let parent_span = Span::current();
3024
3025 async_stream::stream!({
3026 loop {
3027 trace!("Syncing");
3028
3029 if sync_settings.ignore_timeout_on_first_sync {
3030 if is_first_sync {
3031 timeout = sync_settings.timeout.take();
3032 } else if sync_settings.timeout.is_none() && timeout.is_some() {
3033 sync_settings.timeout = timeout.take();
3034 }
3035
3036 is_first_sync = false;
3037 }
3038
3039 yield self
3040 .sync_loop_helper(&mut sync_settings)
3041 .instrument(parent_span.clone())
3042 .await;
3043
3044 Client::delay_sync(&mut last_sync_time).await
3045 }
3046 })
3047 }
3048
3049 /// Get the current, if any, sync token of the client.
3050 /// This will be None if the client didn't sync at least once.
3051 pub(crate) async fn sync_token(&self) -> Option<String> {
3052 self.inner.base_client.sync_token().await
3053 }
3054
3055 /// Gets information about the owner of a given access token.
3056 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3057 let request = whoami::v3::Request::new();
3058 self.send(request).await
3059 }
3060
3061 /// Subscribes a new receiver to client SessionChange broadcasts.
3062 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3063 let broadcast = &self.auth_ctx().session_change_sender;
3064 broadcast.subscribe()
3065 }
3066
3067 /// Sets the save/restore session callbacks.
3068 ///
3069 /// This is another mechanism to get synchronous updates to session tokens,
3070 /// while [`Self::subscribe_to_session_changes`] provides an async update.
3071 pub fn set_session_callbacks(
3072 &self,
3073 reload_session_callback: Box<ReloadSessionCallback>,
3074 save_session_callback: Box<SaveSessionCallback>,
3075 ) -> Result<()> {
3076 self.inner
3077 .auth_ctx
3078 .reload_session_callback
3079 .set(reload_session_callback)
3080 .map_err(|_| Error::MultipleSessionCallbacks)?;
3081
3082 self.inner
3083 .auth_ctx
3084 .save_session_callback
3085 .set(save_session_callback)
3086 .map_err(|_| Error::MultipleSessionCallbacks)?;
3087
3088 Ok(())
3089 }
3090
3091 /// Get the notification settings of the current owner of the client.
3092 pub async fn notification_settings(&self) -> NotificationSettings {
3093 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3094 NotificationSettings::new(self.clone(), ruleset)
3095 }
3096
3097 /// Create a new specialized `Client` that can process notifications.
3098 ///
3099 /// See [`CrossProcessLock::new`] to learn more about
3100 /// `cross_process_lock_config`.
3101 ///
3102 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3103 pub async fn notification_client(
3104 &self,
3105 cross_process_lock_config: CrossProcessLockConfig,
3106 ) -> Result<Client> {
3107 let client = Client {
3108 inner: ClientInner::new(
3109 self.inner.auth_ctx.clone(),
3110 self.server().cloned(),
3111 self.homeserver(),
3112 self.sliding_sync_version(),
3113 self.inner.http_client.clone(),
3114 self.inner
3115 .base_client
3116 .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3117 .await?,
3118 self.inner.caches.supported_versions.read().await.clone(),
3119 self.inner.caches.well_known.read().await.clone(),
3120 self.inner.respect_login_well_known,
3121 self.inner.event_cache.clone(),
3122 self.inner.send_queue_data.clone(),
3123 self.inner.latest_events.clone(),
3124 #[cfg(feature = "e2e-encryption")]
3125 self.inner.e2ee.encryption_settings,
3126 #[cfg(feature = "e2e-encryption")]
3127 self.inner.enable_share_history_on_invite,
3128 cross_process_lock_config,
3129 #[cfg(feature = "experimental-search")]
3130 self.inner.search_index.clone(),
3131 self.inner.thread_subscription_catchup.clone(),
3132 )
3133 .await,
3134 };
3135
3136 Ok(client)
3137 }
3138
3139 /// The [`EventCache`] instance for this [`Client`].
3140 pub fn event_cache(&self) -> &EventCache {
3141 // SAFETY: always initialized in the `Client` ctor.
3142 self.inner.event_cache.get().unwrap()
3143 }
3144
3145 /// The [`LatestEvents`] instance for this [`Client`].
3146 pub async fn latest_events(&self) -> &LatestEvents {
3147 self.inner
3148 .latest_events
3149 .get_or_init(|| async {
3150 LatestEvents::new(
3151 WeakClient::from_client(self),
3152 self.event_cache().clone(),
3153 SendQueue::new(self.clone()),
3154 self.room_info_notable_update_receiver(),
3155 )
3156 })
3157 .await
3158 }
3159
3160 /// Waits until an at least partially synced room is received, and returns
3161 /// it.
3162 ///
3163 /// **Note: this function will loop endlessly until either it finds the room
3164 /// or an externally set timeout happens.**
3165 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3166 loop {
3167 if let Some(room) = self.get_room(room_id) {
3168 if room.is_state_partially_or_fully_synced() {
3169 debug!("Found just created room!");
3170 return room;
3171 }
3172 debug!("Room wasn't partially synced, waiting for sync beat to try again");
3173 } else {
3174 debug!("Room wasn't found, waiting for sync beat to try again");
3175 }
3176 self.inner.sync_beat.listen().await;
3177 }
3178 }
3179
3180 /// Knock on a room given its `room_id_or_alias` to ask for permission to
3181 /// join it.
3182 pub async fn knock(
3183 &self,
3184 room_id_or_alias: OwnedRoomOrAliasId,
3185 reason: Option<String>,
3186 server_names: Vec<OwnedServerName>,
3187 ) -> Result<Room> {
3188 let request =
3189 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3190 let response = self.send(request).await?;
3191 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3192 Ok(Room::new(self.clone(), base_room))
3193 }
3194
3195 /// Checks whether the provided `user_id` belongs to an ignored user.
3196 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3197 self.base_client().is_user_ignored(user_id).await
3198 }
3199
3200 /// Gets the `max_upload_size` value from the homeserver, getting either a
3201 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3202 /// missing.
3203 ///
3204 /// Check the spec for more info:
3205 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3206 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3207 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3208 if let Some(data) = max_upload_size_lock.get() {
3209 return Ok(data.to_owned());
3210 }
3211
3212 // Use the authenticated endpoint when the server supports it.
3213 let supported_versions = self.supported_versions().await?;
3214 let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3215 .is_supported(&supported_versions);
3216
3217 let upload_size = if use_auth {
3218 self.send(authenticated_media::get_media_config::v1::Request::default())
3219 .await?
3220 .upload_size
3221 } else {
3222 #[allow(deprecated)]
3223 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3224 };
3225
3226 match max_upload_size_lock.set(upload_size) {
3227 Ok(_) => Ok(upload_size),
3228 Err(error) => {
3229 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3230 }
3231 }
3232 }
3233
3234 /// The settings to use for decrypting events.
3235 #[cfg(feature = "e2e-encryption")]
3236 pub fn decryption_settings(&self) -> &DecryptionSettings {
3237 &self.base_client().decryption_settings
3238 }
3239
3240 /// Returns the [`SearchIndex`] for this [`Client`].
3241 #[cfg(feature = "experimental-search")]
3242 pub fn search_index(&self) -> &SearchIndex {
3243 &self.inner.search_index
3244 }
3245
3246 /// Whether the client is configured to take thread subscriptions (MSC4306
3247 /// and MSC4308) into account.
3248 ///
3249 /// This may cause filtering out of thread subscriptions, and loading the
3250 /// thread subscriptions via the sliding sync extension, when the room
3251 /// list service is being used.
3252 pub fn enabled_thread_subscriptions(&self) -> bool {
3253 match self.base_client().threading_support {
3254 ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
3255 ThreadingSupport::Disabled => false,
3256 }
3257 }
3258
3259 /// Fetch thread subscriptions changes between `from` and up to `to`.
3260 ///
3261 /// The `limit` optional parameter can be used to limit the number of
3262 /// entries in a response. It can also be overridden by the server, if
3263 /// it's deemed too large.
3264 pub async fn fetch_thread_subscriptions(
3265 &self,
3266 from: Option<String>,
3267 to: Option<String>,
3268 limit: Option<UInt>,
3269 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3270 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3271 from,
3272 to,
3273 limit,
3274 });
3275 Ok(self.send(request).await?)
3276 }
3277
3278 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3279 self.inner.thread_subscription_catchup.get().unwrap()
3280 }
3281
3282 /// Perform database optimizations if any are available, i.e. vacuuming in
3283 /// SQLite.
3284 ///
3285 /// **Warning:** this was added to check if SQLite fragmentation was the
3286 /// source of performance issues, **DO NOT use in production**.
3287 #[doc(hidden)]
3288 pub async fn optimize_stores(&self) -> Result<()> {
3289 trace!("Optimizing state store...");
3290 self.state_store().optimize().await?;
3291
3292 trace!("Optimizing event cache store...");
3293 if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3294 clean_lock.optimize().await?;
3295 }
3296
3297 trace!("Optimizing media store...");
3298 self.media_store().lock().await?.optimize().await?;
3299
3300 Ok(())
3301 }
3302
3303 /// Returns the sizes of the existing stores, if known.
3304 pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3305 #[cfg(feature = "e2e-encryption")]
3306 let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3307 && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3308 {
3309 Some(store_size)
3310 } else {
3311 None
3312 };
3313 #[cfg(not(feature = "e2e-encryption"))]
3314 let crypto_store_size = None;
3315
3316 let state_store_size = self.state_store().get_size().await.ok().flatten();
3317
3318 let event_cache_store_size = if let Some(clean_lock) =
3319 self.event_cache_store().lock().await?.as_clean()
3320 && let Ok(Some(store_size)) = clean_lock.get_size().await
3321 {
3322 Some(store_size)
3323 } else {
3324 None
3325 };
3326
3327 let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3328
3329 Ok(StoreSizes {
3330 crypto_store: crypto_store_size,
3331 state_store: state_store_size,
3332 event_cache_store: event_cache_store_size,
3333 media_store: media_store_size,
3334 })
3335 }
3336
3337 /// Get a reference to the client's task monitor, for spawning background
3338 /// tasks.
3339 pub fn task_monitor(&self) -> &TaskMonitor {
3340 &self.inner.task_monitor
3341 }
3342
3343 /// Add a subscriber for duplicate key upload error notifications triggered
3344 /// by requests to /keys/upload.
3345 #[cfg(feature = "e2e-encryption")]
3346 pub fn subscribe_to_duplicate_key_upload_errors(
3347 &self,
3348 ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3349 self.inner.duplicate_key_upload_error_sender.subscribe()
3350 }
3351
3352 /// Check the record of whether we are waiting for an [MSC4268] key bundle
3353 /// for the given room.
3354 ///
3355 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3356 #[cfg(feature = "e2e-encryption")]
3357 pub async fn get_pending_key_bundle_details_for_room(
3358 &self,
3359 room_id: &RoomId,
3360 ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3361 Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3362 }
3363}
3364
3365/// Contains the disk size of the different stores, if known. It won't be
3366/// available for in-memory stores.
3367#[derive(Debug, Clone)]
3368pub struct StoreSizes {
3369 /// The size of the CryptoStore.
3370 pub crypto_store: Option<usize>,
3371 /// The size of the StateStore.
3372 pub state_store: Option<usize>,
3373 /// The size of the EventCacheStore.
3374 pub event_cache_store: Option<usize>,
3375 /// The size of the MediaStore.
3376 pub media_store: Option<usize>,
3377}
3378
3379#[cfg(any(feature = "testing", test))]
3380impl Client {
3381 /// Test helper to mark users as tracked by the crypto layer.
3382 #[cfg(feature = "e2e-encryption")]
3383 pub async fn update_tracked_users_for_testing(
3384 &self,
3385 user_ids: impl IntoIterator<Item = &UserId>,
3386 ) {
3387 let olm = self.olm_machine().await;
3388 let olm = olm.as_ref().unwrap();
3389 olm.update_tracked_users(user_ids).await.unwrap();
3390 }
3391}
3392
3393/// A weak reference to the inner client, useful when trying to get a handle
3394/// on the owning client.
3395#[derive(Clone, Debug)]
3396pub(crate) struct WeakClient {
3397 client: Weak<ClientInner>,
3398}
3399
3400impl WeakClient {
3401 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3402 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3403 Self { client: Arc::downgrade(client) }
3404 }
3405
3406 /// Construct a [`WeakClient`] from a [`Client`].
3407 pub fn from_client(client: &Client) -> Self {
3408 Self::from_inner(&client.inner)
3409 }
3410
3411 /// Attempts to get a [`Client`] from this [`WeakClient`].
3412 pub fn get(&self) -> Option<Client> {
3413 self.client.upgrade().map(|inner| Client { inner })
3414 }
3415
3416 /// Gets the number of strong (`Arc`) pointers still pointing to this
3417 /// client.
3418 #[allow(dead_code)]
3419 pub fn strong_count(&self) -> usize {
3420 self.client.strong_count()
3421 }
3422}
3423
3424/// Information about the state of a room before we joined it.
3425#[derive(Debug, Clone, Default)]
3426struct PreJoinRoomInfo {
3427 /// The user who invited us to the room, if any.
3428 pub inviter: Option<RoomMember>,
3429}
3430
3431// The http mocking library is not supported for wasm32
3432#[cfg(all(test, not(target_family = "wasm")))]
3433pub(crate) mod tests {
3434 use std::{sync::Arc, time::Duration};
3435
3436 use assert_matches::assert_matches;
3437 use assert_matches2::assert_let;
3438 use eyeball::SharedObservable;
3439 use futures_util::{FutureExt, StreamExt, pin_mut};
3440 use js_int::{UInt, uint};
3441 use matrix_sdk_base::{
3442 RoomState,
3443 store::{MemoryStore, StoreConfig},
3444 };
3445 use matrix_sdk_test::{
3446 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3447 event_factory::EventFactory,
3448 };
3449 #[cfg(target_family = "wasm")]
3450 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3451
3452 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3453 use ruma::{
3454 RoomId, ServerName, UserId,
3455 api::{
3456 FeatureFlag, MatrixVersion,
3457 client::{
3458 discovery::discover_homeserver::RtcFocusInfo,
3459 room::create_room::v3::Request as CreateRoomRequest,
3460 },
3461 },
3462 assign,
3463 events::{
3464 ignored_user_list::IgnoredUserListEventContent,
3465 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3466 },
3467 owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3468 };
3469 use serde_json::json;
3470 use stream_assert::{assert_next_matches, assert_pending};
3471 use tokio::{
3472 spawn,
3473 time::{sleep, timeout},
3474 };
3475 use url::Url;
3476
3477 use super::Client;
3478 use crate::{
3479 Error, TransmissionProgress,
3480 client::{WeakClient, futures::SendMediaUploadRequest},
3481 config::{RequestConfig, SyncSettings},
3482 futures::SendRequest,
3483 media::MediaError,
3484 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3485 };
3486
3487 #[async_test]
3488 async fn test_account_data() {
3489 let server = MatrixMockServer::new().await;
3490 let client = server.client_builder().build().await;
3491
3492 let f = EventFactory::new();
3493 server
3494 .mock_sync()
3495 .ok_and_run(&client, |builder| {
3496 builder.add_global_account_data(
3497 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3498 );
3499 })
3500 .await;
3501
3502 let content = client
3503 .account()
3504 .account_data::<IgnoredUserListEventContent>()
3505 .await
3506 .unwrap()
3507 .unwrap()
3508 .deserialize()
3509 .unwrap();
3510
3511 assert_eq!(content.ignored_users.len(), 1);
3512 }
3513
3514 #[async_test]
3515 async fn test_successful_discovery() {
3516 // Imagine this is `matrix.org`.
3517 let server = MatrixMockServer::new().await;
3518 let server_url = server.uri();
3519
3520 // Imagine this is `matrix-client.matrix.org`.
3521 let homeserver = MatrixMockServer::new().await;
3522 let homeserver_url = homeserver.uri();
3523
3524 // Imagine Alice has the user ID `@alice:matrix.org`.
3525 let domain = server_url.strip_prefix("http://").unwrap();
3526 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3527
3528 // The `.well-known` is on the server (e.g. `matrix.org`).
3529 server
3530 .mock_well_known()
3531 .ok_with_homeserver_url(&homeserver_url)
3532 .mock_once()
3533 .named("well-known")
3534 .mount()
3535 .await;
3536
3537 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3538 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3539
3540 let client = Client::builder()
3541 .insecure_server_name_no_tls(alice.server_name())
3542 .build()
3543 .await
3544 .unwrap();
3545
3546 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3547 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3548 client.server_versions().await.unwrap();
3549 }
3550
3551 #[async_test]
3552 async fn test_discovery_broken_server() {
3553 let server = MatrixMockServer::new().await;
3554 let server_url = server.uri();
3555 let domain = server_url.strip_prefix("http://").unwrap();
3556 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3557
3558 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3559
3560 assert!(
3561 Client::builder()
3562 .insecure_server_name_no_tls(alice.server_name())
3563 .build()
3564 .await
3565 .is_err(),
3566 "Creating a client from a user ID should fail when the .well-known request fails."
3567 );
3568 }
3569
3570 #[async_test]
3571 async fn test_room_creation() {
3572 let server = MatrixMockServer::new().await;
3573 let client = server.client_builder().build().await;
3574
3575 let f = EventFactory::new().sender(user_id!("@example:localhost"));
3576 server
3577 .mock_sync()
3578 .ok_and_run(&client, |builder| {
3579 builder.add_joined_room(
3580 JoinedRoomBuilder::default()
3581 .add_state_event(
3582 f.member(user_id!("@example:localhost")).display_name("example"),
3583 )
3584 .add_state_event(f.default_power_levels()),
3585 );
3586 })
3587 .await;
3588
3589 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3590 assert_eq!(room.state(), RoomState::Joined);
3591 }
3592
3593 #[async_test]
3594 async fn test_retry_limit_http_requests() {
3595 let server = MatrixMockServer::new().await;
3596 let client = server
3597 .client_builder()
3598 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3599 .build()
3600 .await;
3601
3602 assert!(client.request_config().retry_limit.unwrap() == 4);
3603
3604 server.mock_who_am_i().error500().expect(4).mount().await;
3605
3606 client.whoami().await.unwrap_err();
3607 }
3608
3609 #[async_test]
3610 async fn test_retry_timeout_http_requests() {
3611 // Keep this timeout small so that the test doesn't take long
3612 let retry_timeout = Duration::from_secs(5);
3613 let server = MatrixMockServer::new().await;
3614 let client = server
3615 .client_builder()
3616 .on_builder(|builder| {
3617 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3618 })
3619 .build()
3620 .await;
3621
3622 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3623
3624 server.mock_login().error500().expect(2..).mount().await;
3625
3626 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3627 }
3628
3629 #[async_test]
3630 async fn test_short_retry_initial_http_requests() {
3631 let server = MatrixMockServer::new().await;
3632 let client = server
3633 .client_builder()
3634 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3635 .build()
3636 .await;
3637
3638 server.mock_login().error500().expect(3..).mount().await;
3639
3640 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3641 }
3642
3643 #[async_test]
3644 async fn test_no_retry_http_requests() {
3645 let server = MatrixMockServer::new().await;
3646 let client = server.client_builder().build().await;
3647
3648 server.mock_devices().error500().mock_once().mount().await;
3649
3650 client.devices().await.unwrap_err();
3651 }
3652
3653 #[async_test]
3654 async fn test_set_homeserver() {
3655 let client = MockClientBuilder::new(None).build().await;
3656 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3657
3658 let homeserver = Url::parse("http://example.com/").unwrap();
3659 client.set_homeserver(homeserver.clone());
3660 assert_eq!(client.homeserver(), homeserver);
3661 }
3662
3663 #[async_test]
3664 async fn test_search_user_request() {
3665 let server = MatrixMockServer::new().await;
3666 let client = server.client_builder().build().await;
3667
3668 server.mock_user_directory().ok().mock_once().mount().await;
3669
3670 let response = client.search_users("test", 50).await.unwrap();
3671 assert_eq!(response.results.len(), 1);
3672 let result = &response.results[0];
3673 assert_eq!(result.user_id.to_string(), "@test:example.me");
3674 assert_eq!(result.display_name.clone().unwrap(), "Test");
3675 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3676 assert!(!response.limited);
3677 }
3678
3679 #[async_test]
3680 async fn test_request_unstable_features() {
3681 let server = MatrixMockServer::new().await;
3682 let client = server.client_builder().no_server_versions().build().await;
3683
3684 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3685
3686 let unstable_features = client.unstable_features().await.unwrap();
3687 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3688 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3689 }
3690
3691 #[async_test]
3692 async fn test_can_homeserver_push_encrypted_event_to_device() {
3693 let server = MatrixMockServer::new().await;
3694 let client = server.client_builder().no_server_versions().build().await;
3695
3696 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3697
3698 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3699 assert!(msc4028_enabled);
3700 }
3701
3702 #[async_test]
3703 async fn test_recently_visited_rooms() {
3704 // Tracking recently visited rooms requires authentication
3705 let client = MockClientBuilder::new(None).unlogged().build().await;
3706 assert_matches!(
3707 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3708 Err(Error::AuthenticationRequired)
3709 );
3710
3711 let client = MockClientBuilder::new(None).build().await;
3712 let account = client.account();
3713
3714 // We should start off with an empty list
3715 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3716
3717 // Tracking a valid room id should add it to the list
3718 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3719 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3720 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3721
3722 // And the existing list shouldn't be changed
3723 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3724 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3725
3726 // Tracking the same room again shouldn't change the list
3727 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3728 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3729 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3730
3731 // Tracking a second room should add it to the front of the list
3732 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3733 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3734 assert_eq!(
3735 account.get_recently_visited_rooms().await.unwrap(),
3736 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3737 );
3738
3739 // Tracking the first room yet again should move it to the front of the list
3740 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3741 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3742 assert_eq!(
3743 account.get_recently_visited_rooms().await.unwrap(),
3744 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3745 );
3746
3747 // Tracking should be capped at 20
3748 for n in 0..20 {
3749 account
3750 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3751 .await
3752 .unwrap();
3753 }
3754
3755 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3756
3757 // And the initial rooms should've been pushed out
3758 let rooms = account.get_recently_visited_rooms().await.unwrap();
3759 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3760 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3761
3762 // And the last tracked room should be the first
3763 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3764 }
3765
3766 #[async_test]
3767 async fn test_client_no_cycle_with_event_cache() {
3768 let client = MockClientBuilder::new(None).build().await;
3769
3770 // Wait for the init tasks to die.
3771 sleep(Duration::from_secs(1)).await;
3772
3773 let weak_client = WeakClient::from_client(&client);
3774 assert_eq!(weak_client.strong_count(), 1);
3775
3776 {
3777 let room_id = room_id!("!room:example.org");
3778
3779 // Have the client know the room.
3780 let response = SyncResponseBuilder::default()
3781 .add_joined_room(JoinedRoomBuilder::new(room_id))
3782 .build_sync_response();
3783 client.inner.base_client.receive_sync_response(response).await.unwrap();
3784
3785 client.event_cache().subscribe().unwrap();
3786
3787 let (_room_event_cache, _drop_handles) =
3788 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3789 }
3790
3791 drop(client);
3792
3793 // Give a bit of time for background tasks to die.
3794 sleep(Duration::from_secs(1)).await;
3795
3796 // The weak client must be the last reference to the client now.
3797 assert_eq!(weak_client.strong_count(), 0);
3798 let client = weak_client.get();
3799 assert!(
3800 client.is_none(),
3801 "too many strong references to the client: {}",
3802 Arc::strong_count(&client.unwrap().inner)
3803 );
3804 }
3805
3806 #[async_test]
3807 async fn test_supported_versions_caching() {
3808 let server = MatrixMockServer::new().await;
3809
3810 let versions_mock = server
3811 .mock_versions()
3812 .expect_default_access_token()
3813 .ok_with_unstable_features()
3814 .named("first versions mock")
3815 .expect(1)
3816 .mount_as_scoped()
3817 .await;
3818
3819 let memory_store = Arc::new(MemoryStore::new());
3820 let client = server
3821 .client_builder()
3822 .no_server_versions()
3823 .on_builder(|builder| {
3824 builder.store_config(
3825 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3826 .state_store(memory_store.clone()),
3827 )
3828 })
3829 .build()
3830 .await;
3831
3832 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3833
3834 // The result was cached.
3835 assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3836 // This subsequent call hits the in-memory cache.
3837 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3838
3839 drop(client);
3840
3841 let client = server
3842 .client_builder()
3843 .no_server_versions()
3844 .on_builder(|builder| {
3845 builder.store_config(
3846 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3847 .state_store(memory_store.clone()),
3848 )
3849 })
3850 .build()
3851 .await;
3852
3853 // These calls to the new client hit the on-disk cache.
3854 assert!(
3855 client
3856 .unstable_features()
3857 .await
3858 .unwrap()
3859 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3860 );
3861 let supported = client.supported_versions().await.unwrap();
3862 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3863 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3864
3865 // Then this call hits the in-memory cache.
3866 let supported = client.supported_versions().await.unwrap();
3867 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3868 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3869
3870 drop(versions_mock);
3871
3872 // Now, reset the cache, and observe the endpoints being called again once.
3873 client.reset_supported_versions().await.unwrap();
3874
3875 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3876
3877 // Hits network again.
3878 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3879 // Hits in-memory cache again.
3880 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3881 }
3882
3883 #[async_test]
3884 async fn test_well_known_caching() {
3885 let server = MatrixMockServer::new().await;
3886 let server_url = server.uri();
3887 let domain = server_url.strip_prefix("http://").unwrap();
3888 let server_name = <&ServerName>::try_from(domain).unwrap();
3889 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3890
3891 let well_known_mock = server
3892 .mock_well_known()
3893 .ok()
3894 .named("well known mock")
3895 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3896 .mount_as_scoped()
3897 .await;
3898
3899 let memory_store = Arc::new(MemoryStore::new());
3900 let client = Client::builder()
3901 .insecure_server_name_no_tls(server_name)
3902 .store_config(
3903 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3904 .state_store(memory_store.clone()),
3905 )
3906 .build()
3907 .await
3908 .unwrap();
3909
3910 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3911
3912 // This subsequent call hits the in-memory cache.
3913 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3914
3915 drop(client);
3916
3917 let client = server
3918 .client_builder()
3919 .no_server_versions()
3920 .on_builder(|builder| {
3921 builder.store_config(
3922 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3923 .state_store(memory_store.clone()),
3924 )
3925 })
3926 .build()
3927 .await;
3928
3929 // This call to the new client hits the on-disk cache.
3930 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3931
3932 // Then this call hits the in-memory cache.
3933 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3934
3935 drop(well_known_mock);
3936
3937 // Now, reset the cache, and observe the endpoints being called again once.
3938 client.reset_well_known().await.unwrap();
3939
3940 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3941
3942 // Hits network again.
3943 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3944 // Hits in-memory cache again.
3945 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3946 }
3947
3948 #[async_test]
3949 async fn test_missing_well_known_caching() {
3950 let server = MatrixMockServer::new().await;
3951 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3952
3953 let well_known_mock = server
3954 .mock_well_known()
3955 .error_unrecognized()
3956 .named("first well-known mock")
3957 .expect(1)
3958 .mount_as_scoped()
3959 .await;
3960
3961 let memory_store = Arc::new(MemoryStore::new());
3962 let client = server
3963 .client_builder()
3964 .on_builder(|builder| {
3965 builder.store_config(
3966 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3967 .state_store(memory_store.clone()),
3968 )
3969 })
3970 .build()
3971 .await;
3972
3973 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3974
3975 // This subsequent call hits the in-memory cache.
3976 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3977
3978 drop(client);
3979
3980 let client = server
3981 .client_builder()
3982 .on_builder(|builder| {
3983 builder.store_config(
3984 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3985 .state_store(memory_store.clone()),
3986 )
3987 })
3988 .build()
3989 .await;
3990
3991 // This call to the new client hits the on-disk cache.
3992 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3993
3994 // Then this call hits the in-memory cache.
3995 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3996
3997 drop(well_known_mock);
3998
3999 // Now, reset the cache, and observe the endpoints being called again once.
4000 client.reset_well_known().await.unwrap();
4001
4002 server
4003 .mock_well_known()
4004 .error_unrecognized()
4005 .expect(1)
4006 .named("second well-known mock")
4007 .mount()
4008 .await;
4009
4010 // Hits network again.
4011 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4012 // Hits in-memory cache again.
4013 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4014 }
4015
4016 #[async_test]
4017 async fn test_no_network_doesnt_cause_infinite_retries() {
4018 // We want infinite retries for transient errors.
4019 let client = MockClientBuilder::new(None)
4020 .on_builder(|builder| builder.request_config(RequestConfig::new()))
4021 .build()
4022 .await;
4023
4024 // We don't define a mock server on purpose here, so that the error is really a
4025 // network error.
4026 client.whoami().await.unwrap_err();
4027 }
4028
4029 #[async_test]
4030 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4031 let server = MatrixMockServer::new().await;
4032 let client = server.client_builder().build().await;
4033
4034 let room_id = room_id!("!room:example.org");
4035
4036 server
4037 .mock_sync()
4038 .ok_and_run(&client, |builder| {
4039 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4040 })
4041 .await;
4042
4043 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4044 assert_eq!(room.room_id(), room_id);
4045 }
4046
4047 #[async_test]
4048 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4049 let server = MatrixMockServer::new().await;
4050 let client = server.client_builder().build().await;
4051
4052 let room_id = room_id!("!room:example.org");
4053
4054 let client = Arc::new(client);
4055
4056 // Perform the /sync request with a delay so it starts after the
4057 // `await_room_remote_echo` call has happened
4058 spawn({
4059 let client = client.clone();
4060 async move {
4061 sleep(Duration::from_millis(100)).await;
4062
4063 server
4064 .mock_sync()
4065 .ok_and_run(&client, |builder| {
4066 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4067 })
4068 .await;
4069 }
4070 });
4071
4072 let room =
4073 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4074 assert_eq!(room.room_id(), room_id);
4075 }
4076
4077 #[async_test]
4078 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4079 let client = MockClientBuilder::new(None).build().await;
4080
4081 let room_id = room_id!("!room:example.org");
4082 // Room is not present so the client won't be able to find it. The call will
4083 // timeout.
4084 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4085 }
4086
4087 #[async_test]
4088 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4089 let server = MatrixMockServer::new().await;
4090 let client = server.client_builder().build().await;
4091
4092 server.mock_create_room().ok().mount().await;
4093
4094 // Create a room in the internal store
4095 let room = client
4096 .create_room(assign!(CreateRoomRequest::new(), {
4097 invite: vec![],
4098 is_direct: false,
4099 }))
4100 .await
4101 .unwrap();
4102
4103 // Room is locally present, but not synced, the call will timeout
4104 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4105 .await
4106 .unwrap_err();
4107 }
4108
4109 #[async_test]
4110 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4111 let server = MatrixMockServer::new().await;
4112 let client = server.client_builder().build().await;
4113
4114 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4115
4116 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4117 assert_matches!(ret, Ok(true));
4118 }
4119
4120 #[async_test]
4121 async fn test_is_room_alias_available_if_alias_is_resolved() {
4122 let server = MatrixMockServer::new().await;
4123 let client = server.client_builder().build().await;
4124
4125 server
4126 .mock_room_directory_resolve_alias()
4127 .ok("!some_room_id:matrix.org", Vec::new())
4128 .expect(1)
4129 .mount()
4130 .await;
4131
4132 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4133 assert_matches!(ret, Ok(false));
4134 }
4135
4136 #[async_test]
4137 async fn test_is_room_alias_available_if_error_found() {
4138 let server = MatrixMockServer::new().await;
4139 let client = server.client_builder().build().await;
4140
4141 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4142
4143 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4144 assert_matches!(ret, Err(_));
4145 }
4146
4147 #[async_test]
4148 async fn test_create_room_alias() {
4149 let server = MatrixMockServer::new().await;
4150 let client = server.client_builder().build().await;
4151
4152 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4153
4154 let ret = client
4155 .create_room_alias(
4156 room_alias_id!("#some_alias:matrix.org"),
4157 room_id!("!some_room:matrix.org"),
4158 )
4159 .await;
4160 assert_matches!(ret, Ok(()));
4161 }
4162
4163 #[async_test]
4164 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4165 let server = MatrixMockServer::new().await;
4166 let client = server.client_builder().build().await;
4167
4168 let room_id = room_id!("!a-room:matrix.org");
4169
4170 // Make sure the summary endpoint is called once
4171 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4172
4173 // We create a locally cached invited room
4174 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4175
4176 // And we get a preview, the server endpoint was reached
4177 let preview = client
4178 .get_room_preview(room_id.into(), Vec::new())
4179 .await
4180 .expect("Room preview should be retrieved");
4181
4182 assert_eq!(invited_room.room_id(), preview.room_id);
4183 }
4184
4185 #[async_test]
4186 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4187 let server = MatrixMockServer::new().await;
4188 let client = server.client_builder().build().await;
4189
4190 let room_id = room_id!("!a-room:matrix.org");
4191
4192 // Make sure the summary endpoint is called once
4193 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4194
4195 // We create a locally cached left room
4196 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4197
4198 // And we get a preview, the server endpoint was reached
4199 let preview = client
4200 .get_room_preview(room_id.into(), Vec::new())
4201 .await
4202 .expect("Room preview should be retrieved");
4203
4204 assert_eq!(left_room.room_id(), preview.room_id);
4205 }
4206
4207 #[async_test]
4208 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4209 let server = MatrixMockServer::new().await;
4210 let client = server.client_builder().build().await;
4211
4212 let room_id = room_id!("!a-room:matrix.org");
4213
4214 // Make sure the summary endpoint is called once
4215 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4216
4217 // We create a locally cached knocked room
4218 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4219
4220 // And we get a preview, the server endpoint was reached
4221 let preview = client
4222 .get_room_preview(room_id.into(), Vec::new())
4223 .await
4224 .expect("Room preview should be retrieved");
4225
4226 assert_eq!(knocked_room.room_id(), preview.room_id);
4227 }
4228
4229 #[async_test]
4230 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4231 let server = MatrixMockServer::new().await;
4232 let client = server.client_builder().build().await;
4233
4234 let room_id = room_id!("!a-room:matrix.org");
4235
4236 // Make sure the summary endpoint is not called
4237 server.mock_room_summary().ok(room_id).never().mount().await;
4238
4239 // We create a locally cached joined room
4240 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4241
4242 // And we get a preview, no server endpoint was reached
4243 let preview = client
4244 .get_room_preview(room_id.into(), Vec::new())
4245 .await
4246 .expect("Room preview should be retrieved");
4247
4248 assert_eq!(joined_room.room_id(), preview.room_id);
4249 }
4250
4251 #[async_test]
4252 async fn test_media_preview_config() {
4253 let server = MatrixMockServer::new().await;
4254 let client = server.client_builder().build().await;
4255
4256 server
4257 .mock_sync()
4258 .ok_and_run(&client, |builder| {
4259 builder.add_custom_global_account_data(json!({
4260 "content": {
4261 "media_previews": "private",
4262 "invite_avatars": "off"
4263 },
4264 "type": "m.media_preview_config"
4265 }));
4266 })
4267 .await;
4268
4269 let (initial_value, stream) =
4270 client.account().observe_media_preview_config().await.unwrap();
4271
4272 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4273 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4274 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4275 pin_mut!(stream);
4276 assert_pending!(stream);
4277
4278 server
4279 .mock_sync()
4280 .ok_and_run(&client, |builder| {
4281 builder.add_custom_global_account_data(json!({
4282 "content": {
4283 "media_previews": "off",
4284 "invite_avatars": "on"
4285 },
4286 "type": "m.media_preview_config"
4287 }));
4288 })
4289 .await;
4290
4291 assert_next_matches!(
4292 stream,
4293 MediaPreviewConfigEventContent {
4294 media_previews: Some(MediaPreviews::Off),
4295 invite_avatars: Some(InviteAvatars::On),
4296 ..
4297 }
4298 );
4299 assert_pending!(stream);
4300 }
4301
4302 #[async_test]
4303 async fn test_unstable_media_preview_config() {
4304 let server = MatrixMockServer::new().await;
4305 let client = server.client_builder().build().await;
4306
4307 server
4308 .mock_sync()
4309 .ok_and_run(&client, |builder| {
4310 builder.add_custom_global_account_data(json!({
4311 "content": {
4312 "media_previews": "private",
4313 "invite_avatars": "off"
4314 },
4315 "type": "io.element.msc4278.media_preview_config"
4316 }));
4317 })
4318 .await;
4319
4320 let (initial_value, stream) =
4321 client.account().observe_media_preview_config().await.unwrap();
4322
4323 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4324 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4325 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4326 pin_mut!(stream);
4327 assert_pending!(stream);
4328
4329 server
4330 .mock_sync()
4331 .ok_and_run(&client, |builder| {
4332 builder.add_custom_global_account_data(json!({
4333 "content": {
4334 "media_previews": "off",
4335 "invite_avatars": "on"
4336 },
4337 "type": "io.element.msc4278.media_preview_config"
4338 }));
4339 })
4340 .await;
4341
4342 assert_next_matches!(
4343 stream,
4344 MediaPreviewConfigEventContent {
4345 media_previews: Some(MediaPreviews::Off),
4346 invite_avatars: Some(InviteAvatars::On),
4347 ..
4348 }
4349 );
4350 assert_pending!(stream);
4351 }
4352
4353 #[async_test]
4354 async fn test_media_preview_config_not_found() {
4355 let server = MatrixMockServer::new().await;
4356 let client = server.client_builder().build().await;
4357
4358 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4359
4360 assert!(initial_value.is_none());
4361 }
4362
4363 #[async_test]
4364 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4365 // The default Matrix version we use is 1.11 or higher, so authenticated media
4366 // is supported.
4367 let server = MatrixMockServer::new().await;
4368 let client = server.client_builder().build().await;
4369
4370 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4371
4372 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4373 client.load_or_fetch_max_upload_size().await.unwrap();
4374
4375 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4376 }
4377
4378 #[async_test]
4379 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4380 // The server must advertise support for the stable feature for authenticated
4381 // media support, so we mock the `GET /versions` response.
4382 let server = MatrixMockServer::new().await;
4383 let client = server.client_builder().no_server_versions().build().await;
4384
4385 server
4386 .mock_versions()
4387 .ok_custom(
4388 &["v1.7", "v1.8", "v1.9", "v1.10"],
4389 &[("org.matrix.msc3916.stable", true)].into(),
4390 )
4391 .named("versions")
4392 .expect(1)
4393 .mount()
4394 .await;
4395
4396 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4397
4398 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4399 client.load_or_fetch_max_upload_size().await.unwrap();
4400
4401 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4402 }
4403
4404 #[async_test]
4405 async fn test_load_or_fetch_max_upload_size_no_auth() {
4406 // The server must not support Matrix 1.11 or higher for unauthenticated
4407 // media requests, so we mock the `GET /versions` response.
4408 let server = MatrixMockServer::new().await;
4409 let client = server.client_builder().no_server_versions().build().await;
4410
4411 server
4412 .mock_versions()
4413 .ok_custom(&["v1.1"], &Default::default())
4414 .named("versions")
4415 .expect(1)
4416 .mount()
4417 .await;
4418
4419 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4420
4421 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4422 client.load_or_fetch_max_upload_size().await.unwrap();
4423
4424 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4425 }
4426
4427 #[async_test]
4428 async fn test_uploading_a_too_large_media_file() {
4429 let server = MatrixMockServer::new().await;
4430 let client = server.client_builder().build().await;
4431
4432 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4433 client.load_or_fetch_max_upload_size().await.unwrap();
4434 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4435
4436 let data = vec![1, 2];
4437 let upload_request =
4438 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4439 let request = SendRequest {
4440 client: client.clone(),
4441 request: upload_request,
4442 config: None,
4443 send_progress: SharedObservable::new(TransmissionProgress::default()),
4444 };
4445 let media_request = SendMediaUploadRequest::new(request);
4446
4447 let error = media_request.await.err();
4448 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4449 assert_eq!(max, uint!(1));
4450 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4451 }
4452
4453 #[async_test]
4454 async fn test_dont_ignore_timeout_on_first_sync() {
4455 let server = MatrixMockServer::new().await;
4456 let client = server.client_builder().build().await;
4457
4458 server
4459 .mock_sync()
4460 .timeout(Some(Duration::from_secs(30)))
4461 .ok(|_| {})
4462 .mock_once()
4463 .named("sync_with_timeout")
4464 .mount()
4465 .await;
4466
4467 // Call the endpoint once to check the timeout.
4468 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4469
4470 timeout(Duration::from_secs(1), async {
4471 stream.next().await.unwrap().unwrap();
4472 })
4473 .await
4474 .unwrap();
4475 }
4476
4477 #[async_test]
4478 async fn test_ignore_timeout_on_first_sync() {
4479 let server = MatrixMockServer::new().await;
4480 let client = server.client_builder().build().await;
4481
4482 server
4483 .mock_sync()
4484 .timeout(None)
4485 .ok(|_| {})
4486 .mock_once()
4487 .named("sync_no_timeout")
4488 .mount()
4489 .await;
4490 server
4491 .mock_sync()
4492 .timeout(Some(Duration::from_secs(30)))
4493 .ok(|_| {})
4494 .mock_once()
4495 .named("sync_with_timeout")
4496 .mount()
4497 .await;
4498
4499 // Call each version of the endpoint once to check the timeouts.
4500 let mut stream = Box::pin(
4501 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4502 );
4503
4504 timeout(Duration::from_secs(1), async {
4505 stream.next().await.unwrap().unwrap();
4506 stream.next().await.unwrap().unwrap();
4507 })
4508 .await
4509 .unwrap();
4510 }
4511
4512 #[async_test]
4513 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4514 let server = MatrixMockServer::new().await;
4515 let client = server.client_builder().build().await;
4516 // This is the user ID that is inside MemberAdditional.
4517 // Note the confusing username, so we can share
4518 // GlobalAccountDataTestEvent::Direct with the invited test.
4519 let user_id = user_id!("@invited:localhost");
4520
4521 // When we receive a sync response saying "invited" is invited to a DM
4522 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4523 let response = SyncResponseBuilder::default()
4524 .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4525 .add_global_account_data(
4526 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4527 )
4528 .build_sync_response();
4529 client.base_client().receive_sync_response(response).await.unwrap();
4530
4531 // Then get_dm_room finds this room
4532 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4533 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4534 }
4535
4536 #[async_test]
4537 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4538 let server = MatrixMockServer::new().await;
4539 let client = server.client_builder().build().await;
4540 // This is the user ID that is inside MemberInvite
4541 let user_id = user_id!("@invited:localhost");
4542
4543 // When we receive a sync response saying "invited" is invited to a DM
4544 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4545 let response = SyncResponseBuilder::default()
4546 .add_joined_room(
4547 JoinedRoomBuilder::default()
4548 .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4549 )
4550 .add_global_account_data(
4551 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4552 )
4553 .build_sync_response();
4554 client.base_client().receive_sync_response(response).await.unwrap();
4555
4556 // Then get_dm_room finds this room
4557 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4558 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4559 }
4560
4561 #[async_test]
4562 async fn test_get_dm_room_still_finds_left_room() {
4563 // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4564 // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4565
4566 let server = MatrixMockServer::new().await;
4567 let client = server.client_builder().build().await;
4568 // This is the user ID that is inside MemberAdditional.
4569 // Note the confusing username, so we can share
4570 // GlobalAccountDataTestEvent::Direct with the invited test.
4571 let user_id = user_id!("@invited:localhost");
4572
4573 // When we receive a sync response saying "invited" has left a DM
4574 let f = EventFactory::new().sender(user_id);
4575 let response = SyncResponseBuilder::default()
4576 .add_joined_room(
4577 JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4578 )
4579 .add_global_account_data(
4580 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4581 )
4582 .build_sync_response();
4583 client.base_client().receive_sync_response(response).await.unwrap();
4584
4585 // Then get_dm_room finds this room
4586 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4587 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4588 }
4589
4590 #[async_test]
4591 async fn test_device_exists() {
4592 let server = MatrixMockServer::new().await;
4593 let client = server.client_builder().build().await;
4594
4595 server.mock_get_device().ok().expect(1).mount().await;
4596
4597 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4598 }
4599
4600 #[async_test]
4601 async fn test_device_exists_404() {
4602 let server = MatrixMockServer::new().await;
4603 let client = server.client_builder().build().await;
4604
4605 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4606 }
4607
4608 #[async_test]
4609 async fn test_device_exists_500() {
4610 let server = MatrixMockServer::new().await;
4611 let client = server.client_builder().build().await;
4612
4613 server.mock_get_device().error500().expect(1).mount().await;
4614
4615 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4616 }
4617
4618 #[async_test]
4619 async fn test_fetching_well_known_with_homeserver_url() {
4620 let server = MatrixMockServer::new().await;
4621 let client = server.client_builder().build().await;
4622 server.mock_well_known().ok().mount().await;
4623
4624 assert_matches!(client.fetch_client_well_known().await, Some(_));
4625 }
4626
4627 #[async_test]
4628 async fn test_fetching_well_known_with_server_name() {
4629 let server = MatrixMockServer::new().await;
4630 let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4631
4632 server.mock_well_known().ok().mount().await;
4633
4634 let client = MockClientBuilder::new(None)
4635 .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4636 .build()
4637 .await;
4638
4639 assert_matches!(client.fetch_client_well_known().await, Some(_));
4640 }
4641
4642 #[async_test]
4643 async fn test_fetching_well_known_with_domain_part_of_user_id() {
4644 let server = MatrixMockServer::new().await;
4645 server.mock_well_known().ok().mount().await;
4646
4647 let user_id =
4648 UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4649 let client = MockClientBuilder::new(None)
4650 .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4651 .build()
4652 .await;
4653
4654 assert_matches!(client.fetch_client_well_known().await, Some(_));
4655 }
4656}