matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::{StreamExt, join};
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32 DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35 BaseClient, DmRoomDefinition, RoomInfoNotableUpdate, RoomState, RoomStateFilter,
36 SendOutsideWasm, SessionMeta, StateStoreDataKey, StateStoreDataValue, StoreError,
37 SyncOutsideWasm, ThreadingSupport,
38 event_cache::store::EventCacheStoreLock,
39 media::store::MediaStoreLock,
40 store::{DynStateStore, RoomLoadSettings, SupportedVersionsResponse, WellKnownResponse},
41 sync::{Notification, RoomUpdates},
42 task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl::TtlValue};
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50 api::{
51 FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52 client::{
53 account::whoami,
54 alias::{create_alias, delete_alias, get_alias},
55 authenticated_media,
56 device::{self, delete_devices, get_devices, update_device},
57 directory::{get_public_rooms, get_public_rooms_filtered},
58 discovery::{
59 discover_homeserver::{self, RtcFocusInfo},
60 get_supported_versions,
61 },
62 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
63 knock::knock_room,
64 media,
65 membership::{join_room_by_id, join_room_by_id_or_alias},
66 room::create_room,
67 session::login::v3::DiscoveryInfo,
68 sync::sync_events,
69 threads::get_thread_subscriptions_changes,
70 uiaa,
71 user_directory::search_users,
72 },
73 error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
74 path_builder::PathBuilder,
75 },
76 assign,
77 events::{beacon_info::OriginalSyncBeaconInfoEvent, direct::DirectUserIdentifier},
78 push::Ruleset,
79 time::Instant,
80};
81use serde::de::DeserializeOwned;
82use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
83use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
84use url::Url;
85
86use self::{
87 caches::{Cache, CachedValue, ClientCaches},
88 futures::SendRequest,
89};
90use crate::{
91 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
92 Room, SessionTokens, TransmissionProgress,
93 authentication::{
94 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
95 oauth::OAuth,
96 },
97 client::{
98 homeserver_capabilities::HomeserverCapabilities,
99 thread_subscriptions::ThreadSubscriptionCatchup,
100 },
101 config::{RequestConfig, SyncToken},
102 deduplicating_handler::DeduplicatingHandler,
103 error::HttpResult,
104 event_cache::EventCache,
105 event_handler::{
106 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
107 EventHandlerStore, ObservableEventHandler, SyncEvent,
108 },
109 http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
110 latest_events::LatestEvents,
111 live_locations_observer::BeaconInfoUpdate,
112 media::MediaError,
113 notification_settings::NotificationSettings,
114 room::RoomMember,
115 room_preview::RoomPreview,
116 send_queue::{SendQueue, SendQueueData},
117 sliding_sync::Version as SlidingSyncVersion,
118 sync::{RoomUpdate, SyncResponse},
119};
120#[cfg(feature = "e2e-encryption")]
121use crate::{
122 cross_process_lock::CrossProcessLock,
123 encryption::{
124 DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
125 VerificationState,
126 },
127};
128
129mod builder;
130pub(crate) mod caches;
131pub(crate) mod futures;
132pub(crate) mod homeserver_capabilities;
133pub(crate) mod thread_subscriptions;
134
135pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
136#[cfg(feature = "experimental-search")]
137use crate::search_index::SearchIndex;
138
139#[cfg(not(target_family = "wasm"))]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
141#[cfg(target_family = "wasm")]
142type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
143
144#[cfg(not(target_family = "wasm"))]
145type NotificationHandlerFn =
146 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
147#[cfg(target_family = "wasm")]
148type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
149
150/// Enum controlling if a loop running callbacks should continue or abort.
151///
152/// This is mainly used in the [`sync_with_callback`] method, the return value
153/// of the provided callback controls if the sync loop should be exited.
154///
155/// [`sync_with_callback`]: #method.sync_with_callback
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum LoopCtrl {
158 /// Continue running the loop.
159 Continue,
160 /// Break out of the loop.
161 Break,
162}
163
164/// Represents changes that can occur to a `Client`s `Session`.
165#[derive(Debug, Clone, PartialEq)]
166pub enum SessionChange {
167 /// The session's token is no longer valid.
168 UnknownToken(UnknownTokenErrorData),
169 /// The session's tokens have been refreshed.
170 TokensRefreshed,
171}
172
173/// Information about the server vendor obtained from the federation API.
174#[derive(Debug, Clone, PartialEq, Eq)]
175#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
176pub struct ServerVendorInfo {
177 /// The server name.
178 pub server_name: String,
179 /// The server version.
180 pub version: String,
181}
182
183/// An async/await enabled Matrix client.
184///
185/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
186#[derive(Clone)]
187pub struct Client {
188 pub(crate) inner: Arc<ClientInner>,
189}
190
191#[derive(Default)]
192pub(crate) struct ClientLocks {
193 /// Lock ensuring that only a single room may be marked as a DM at once.
194 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
195 /// explanation.
196 pub(crate) mark_as_dm_lock: Mutex<()>,
197
198 /// Lock ensuring that only a single secret store is getting opened at the
199 /// same time.
200 ///
201 /// This is important so we don't accidentally create multiple different new
202 /// default secret storage keys.
203 #[cfg(feature = "e2e-encryption")]
204 pub(crate) open_secret_store_lock: Mutex<()>,
205
206 /// Lock ensuring that we're only storing a single secret at a time.
207 ///
208 /// Take a look at the [`SecretStore::put_secret`] method for a more
209 /// detailed explanation.
210 ///
211 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
212 #[cfg(feature = "e2e-encryption")]
213 pub(crate) store_secret_lock: Mutex<()>,
214
215 /// Lock ensuring that only one method at a time might modify our backup.
216 #[cfg(feature = "e2e-encryption")]
217 pub(crate) backup_modify_lock: Mutex<()>,
218
219 /// Lock ensuring that we're going to attempt to upload backups for a single
220 /// requester.
221 #[cfg(feature = "e2e-encryption")]
222 pub(crate) backup_upload_lock: Mutex<()>,
223
224 /// Handler making sure we only have one group session sharing request in
225 /// flight per room.
226 #[cfg(feature = "e2e-encryption")]
227 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
228
229 /// Lock making sure we're only doing one key claim request at a time.
230 #[cfg(feature = "e2e-encryption")]
231 pub(crate) key_claim_lock: Mutex<()>,
232
233 /// Handler to ensure that only one members request is running at a time,
234 /// given a room.
235 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
236
237 /// Handler to ensure that only one encryption state request is running at a
238 /// time, given a room.
239 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
240
241 /// Deduplicating handler for sending read receipts. The string is an
242 /// internal implementation detail, see [`Self::send_single_receipt`].
243 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
244
245 #[cfg(feature = "e2e-encryption")]
246 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
247
248 /// Latest "generation" of data known by the crypto store.
249 ///
250 /// This is a counter that only increments, set in the database (and can
251 /// wrap). It's incremented whenever some process acquires a lock for the
252 /// first time. *This assumes the crypto store lock is being held, to
253 /// avoid data races on writing to this value in the store*.
254 ///
255 /// The current process will maintain this value in local memory and in the
256 /// DB over time. Observing a different value than the one read in
257 /// memory, when reading from the store indicates that somebody else has
258 /// written into the database under our feet.
259 ///
260 /// TODO: this should live in the `OlmMachine`, since it's information
261 /// related to the lock. As of today (2023-07-28), we blow up the entire
262 /// olm machine when there's a generation mismatch. So storing the
263 /// generation in the olm machine would make the client think there's
264 /// *always* a mismatch, and that's why we need to store the generation
265 /// outside the `OlmMachine`.
266 #[cfg(feature = "e2e-encryption")]
267 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
268}
269
270pub(crate) struct ClientInner {
271 /// All the data related to authentication and authorization.
272 pub(crate) auth_ctx: Arc<AuthCtx>,
273
274 /// The URL of the server.
275 ///
276 /// Not to be confused with the `Self::homeserver`. `server` is usually
277 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
278 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
279 /// homeserver (at the time of writing — 2024-08-28).
280 ///
281 /// This value is optional depending on how the `Client` has been built.
282 /// If it's been built from a homeserver URL directly, we don't know the
283 /// server. However, if the `Client` has been built from a server URL or
284 /// name, then the homeserver has been discovered, and we know both.
285 server: Option<Url>,
286
287 /// The URL of the homeserver to connect to.
288 ///
289 /// This is the URL for the client-server Matrix API.
290 homeserver: StdRwLock<Url>,
291
292 /// The sliding sync version.
293 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
294
295 /// The underlying HTTP client.
296 pub(crate) http_client: HttpClient,
297
298 /// User session data.
299 pub(super) base_client: BaseClient,
300
301 /// Collection of in-memory caches for the [`Client`].
302 pub(crate) caches: ClientCaches,
303
304 /// Collection of locks individual client methods might want to use, either
305 /// to ensure that only a single call to a method happens at once or to
306 /// deduplicate multiple calls to a method.
307 pub(crate) locks: ClientLocks,
308
309 /// The cross-process lock configuration.
310 ///
311 /// The SDK provides cross-process store locks (see
312 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
313 /// [`CrossProcessLockConfig::MultiProcess`] is used.
314 ///
315 /// If multiple `Client`s are running in different processes, this
316 /// value MUST be different for each `Client`.
317 cross_process_lock_config: CrossProcessLockConfig,
318
319 /// A mapping of the times at which the current user sent typing notices,
320 /// keyed by room.
321 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
322
323 /// Event handlers. See `add_event_handler`.
324 pub(crate) event_handlers: EventHandlerStore,
325
326 /// Notification handlers. See `register_notification_handler`.
327 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
328
329 /// The sender-side of channels used to receive room updates.
330 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
331
332 /// The sender-side of a channel used to observe all the room updates of a
333 /// sync response.
334 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
335
336 /// Whether the client should update its homeserver URL with the discovery
337 /// information present in the login response.
338 respect_login_well_known: bool,
339
340 /// An event that can be listened on to wait for a successful sync. The
341 /// event will only be fired if a sync loop is running. Can be used for
342 /// synchronization, e.g. if we send out a request to create a room, we can
343 /// wait for the sync to get the data to fetch a room object from the state
344 /// store.
345 pub(crate) sync_beat: event_listener::Event,
346
347 /// A central cache for events, inactive first.
348 ///
349 /// It becomes active when [`EventCache::subscribe`] is called.
350 pub(crate) event_cache: OnceCell<EventCache>,
351
352 /// End-to-end encryption related state.
353 #[cfg(feature = "e2e-encryption")]
354 pub(crate) e2ee: EncryptionData,
355
356 /// The verification state of our own device.
357 #[cfg(feature = "e2e-encryption")]
358 pub(crate) verification_state: SharedObservable<VerificationState>,
359
360 /// Whether to enable the experimental support for sending and receiving
361 /// encrypted room history on invite, per [MSC4268].
362 ///
363 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
364 #[cfg(feature = "e2e-encryption")]
365 pub(crate) enable_share_history_on_invite: bool,
366
367 /// Data related to the [`SendQueue`].
368 ///
369 /// [`SendQueue`]: crate::send_queue::SendQueue
370 pub(crate) send_queue_data: Arc<SendQueueData>,
371
372 /// The `max_upload_size` value of the homeserver, it contains the max
373 /// request size you can send.
374 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
375
376 /// The entry point to get the [`LatestEvent`] of rooms and threads.
377 ///
378 /// [`LatestEvent`]: crate::latest_event::LatestEvent
379 latest_events: OnceCell<LatestEvents>,
380
381 /// Service handling the catching up of thread subscriptions in the
382 /// background.
383 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
384
385 #[cfg(feature = "experimental-search")]
386 /// Handler for [`RoomIndex`]'s of each room
387 search_index: SearchIndex,
388
389 /// A monitor for background tasks spawned by the client.
390 pub(crate) task_monitor: TaskMonitor,
391
392 /// A sender to notify subscribers about duplicate key upload errors
393 /// triggered by requests to /keys/upload.
394 #[cfg(feature = "e2e-encryption")]
395 pub(crate) duplicate_key_upload_error_sender:
396 broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
397}
398
399impl ClientInner {
400 /// Create a new `ClientInner`.
401 ///
402 /// All the fields passed as parameters here are those that must be cloned
403 /// upon instantiation of a sub-client, e.g. a client specialized for
404 /// notifications.
405 #[allow(clippy::too_many_arguments)]
406 async fn new(
407 auth_ctx: Arc<AuthCtx>,
408 server: Option<Url>,
409 homeserver: Url,
410 sliding_sync_version: SlidingSyncVersion,
411 http_client: HttpClient,
412 base_client: BaseClient,
413 supported_versions: CachedValue<TtlValue<SupportedVersions>>,
414 well_known: CachedValue<TtlValue<Option<WellKnownResponse>>>,
415 respect_login_well_known: bool,
416 event_cache: OnceCell<EventCache>,
417 send_queue: Arc<SendQueueData>,
418 latest_events: OnceCell<LatestEvents>,
419 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
420 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
421 cross_process_lock_config: CrossProcessLockConfig,
422 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
423 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
424 ) -> Arc<Self> {
425 let caches = ClientCaches {
426 supported_versions: Cache::with_value(supported_versions),
427 well_known: Cache::with_value(well_known),
428 server_metadata: Cache::new(),
429 homeserver_capabilities: Cache::new(),
430 };
431
432 let client = Self {
433 server,
434 homeserver: StdRwLock::new(homeserver),
435 auth_ctx,
436 sliding_sync_version: StdRwLock::new(sliding_sync_version),
437 http_client,
438 base_client,
439 caches,
440 locks: Default::default(),
441 cross_process_lock_config,
442 typing_notice_times: Default::default(),
443 event_handlers: Default::default(),
444 notification_handlers: Default::default(),
445 room_update_channels: Default::default(),
446 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
447 // ballast for all observers to catch up.
448 room_updates_sender: broadcast::Sender::new(32),
449 respect_login_well_known,
450 sync_beat: event_listener::Event::new(),
451 event_cache,
452 send_queue_data: send_queue,
453 latest_events,
454 #[cfg(feature = "e2e-encryption")]
455 e2ee: EncryptionData::new(encryption_settings),
456 #[cfg(feature = "e2e-encryption")]
457 verification_state: SharedObservable::new(VerificationState::Unknown),
458 #[cfg(feature = "e2e-encryption")]
459 enable_share_history_on_invite,
460 server_max_upload_size: Mutex::new(OnceCell::new()),
461 #[cfg(feature = "experimental-search")]
462 search_index: search_index_handler,
463 thread_subscription_catchup,
464 task_monitor: TaskMonitor::new(),
465 #[cfg(feature = "e2e-encryption")]
466 duplicate_key_upload_error_sender: broadcast::channel(1).0,
467 };
468
469 #[allow(clippy::let_and_return)]
470 let client = Arc::new(client);
471
472 #[cfg(feature = "e2e-encryption")]
473 client.e2ee.initialize_tasks(&client);
474
475 let init_event_cache = client.event_cache.get_or_init(|| async {
476 EventCache::new(&client, client.base_client.event_cache_store().clone())
477 });
478
479 let init_thread_subscription_catchup = client
480 .thread_subscription_catchup
481 .get_or_init(|| ThreadSubscriptionCatchup::new(Client { inner: client.clone() }));
482
483 let _ = join!(init_event_cache, init_thread_subscription_catchup);
484
485 client
486 }
487}
488
489#[cfg(not(tarpaulin_include))]
490impl Debug for Client {
491 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
492 write!(fmt, "Client")
493 }
494}
495
496impl Client {
497 /// Create a new [`Client`] that will use the given homeserver.
498 ///
499 /// # Arguments
500 ///
501 /// * `homeserver_url` - The homeserver that the client should connect to.
502 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
503 Self::builder().homeserver_url(homeserver_url).build().await
504 }
505
506 /// Returns a subscriber that publishes an event every time the ignore user
507 /// list changes.
508 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
509 self.inner.base_client.subscribe_to_ignore_user_list_changes()
510 }
511
512 /// Create a new [`ClientBuilder`].
513 pub fn builder() -> ClientBuilder {
514 ClientBuilder::new()
515 }
516
517 pub(crate) fn base_client(&self) -> &BaseClient {
518 &self.inner.base_client
519 }
520
521 /// The underlying HTTP client.
522 pub fn http_client(&self) -> &reqwest::Client {
523 &self.inner.http_client.inner
524 }
525
526 pub(crate) fn locks(&self) -> &ClientLocks {
527 &self.inner.locks
528 }
529
530 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
531 &self.inner.auth_ctx
532 }
533
534 /// The cross-process store lock configuration used by this [`Client`].
535 ///
536 /// The SDK provides cross-process store locks (see
537 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
538 /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
539 /// the value used for all cross-process store locks used by this
540 /// `Client`.
541 pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
542 &self.inner.cross_process_lock_config
543 }
544
545 /// Change the homeserver URL used by this client.
546 ///
547 /// # Arguments
548 ///
549 /// * `homeserver_url` - The new URL to use.
550 fn set_homeserver(&self, homeserver_url: Url) {
551 *self.inner.homeserver.write().unwrap() = homeserver_url;
552 }
553
554 /// Change to a different homeserver and re-resolve well-known.
555 #[cfg(feature = "e2e-encryption")]
556 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
557 &self,
558 homeserver_url: Url,
559 ) -> Result<()> {
560 self.set_homeserver(homeserver_url);
561 self.reset_well_known().await?;
562 if let Some(well_known) = self.well_known().await {
563 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
564 }
565 Ok(())
566 }
567
568 /// Retrieves a helper component to access the [`HomeserverCapabilities`]
569 /// supported or disabled by the homeserver.
570 pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
571 HomeserverCapabilities::new(self.clone())
572 }
573
574 /// Get the server vendor information from the federation API.
575 ///
576 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
577 /// both the server's software name and version.
578 ///
579 /// # Examples
580 ///
581 /// ```no_run
582 /// # use matrix_sdk::Client;
583 /// # use url::Url;
584 /// # async {
585 /// # let homeserver = Url::parse("http://example.com")?;
586 /// let client = Client::new(homeserver).await?;
587 ///
588 /// let server_info = client.server_vendor_info(None).await?;
589 /// println!(
590 /// "Server: {}, Version: {}",
591 /// server_info.server_name, server_info.version
592 /// );
593 /// # anyhow::Ok(()) };
594 /// ```
595 #[cfg(feature = "federation-api")]
596 pub async fn server_vendor_info(
597 &self,
598 request_config: Option<RequestConfig>,
599 ) -> HttpResult<ServerVendorInfo> {
600 use ruma::api::federation::discovery::get_server_version;
601
602 let res = self
603 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
604 .await?;
605
606 // Extract server info, using defaults if fields are missing.
607 let server = res.server.unwrap_or_default();
608 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
609 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
610
611 Ok(ServerVendorInfo { server_name: server_name_str, version })
612 }
613
614 /// Get a copy of the default request config.
615 ///
616 /// The default request config is what's used when sending requests if no
617 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
618 /// function with such a parameter.
619 ///
620 /// If the default request config was not customized through
621 /// [`ClientBuilder`] when creating this `Client`, the returned value will
622 /// be equivalent to [`RequestConfig::default()`].
623 pub fn request_config(&self) -> RequestConfig {
624 self.inner.http_client.request_config
625 }
626
627 /// Check whether the client has been activated.
628 ///
629 /// A client is considered active when:
630 ///
631 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
632 /// is logged in,
633 /// 2. Has loaded cached data from storage,
634 /// 3. If encryption is enabled, it also initialized or restored its
635 /// `OlmMachine`.
636 pub fn is_active(&self) -> bool {
637 self.inner.base_client.is_active()
638 }
639
640 /// The server used by the client.
641 ///
642 /// See `Self::server` to learn more.
643 pub fn server(&self) -> Option<&Url> {
644 self.inner.server.as_ref()
645 }
646
647 /// The homeserver of the client.
648 pub fn homeserver(&self) -> Url {
649 self.inner.homeserver.read().unwrap().clone()
650 }
651
652 /// Get the sliding sync version.
653 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
654 self.inner.sliding_sync_version.read().unwrap().clone()
655 }
656
657 /// Override the sliding sync version.
658 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
659 let mut lock = self.inner.sliding_sync_version.write().unwrap();
660 *lock = version;
661 }
662
663 /// Get the Matrix user session meta information.
664 ///
665 /// If the client is currently logged in, this will return a
666 /// [`SessionMeta`] object which contains the user ID and device ID.
667 /// Otherwise it returns `None`.
668 pub fn session_meta(&self) -> Option<&SessionMeta> {
669 self.base_client().session_meta()
670 }
671
672 /// Returns a receiver that gets events for each room info update. To watch
673 /// for new events, use `receiver.resubscribe()`.
674 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
675 self.base_client().room_info_notable_update_receiver()
676 }
677
678 /// Performs a search for users.
679 /// The search is performed case-insensitively on user IDs and display names
680 ///
681 /// # Arguments
682 ///
683 /// * `search_term` - The search term for the search
684 /// * `limit` - The maximum number of results to return. Defaults to 10.
685 ///
686 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
687 pub async fn search_users(
688 &self,
689 search_term: &str,
690 limit: u64,
691 ) -> HttpResult<search_users::v3::Response> {
692 let mut request = search_users::v3::Request::new(search_term.to_owned());
693
694 if let Some(limit) = UInt::new(limit) {
695 request.limit = limit;
696 }
697
698 self.send(request).await
699 }
700
701 /// Get the user id of the current owner of the client.
702 pub fn user_id(&self) -> Option<&UserId> {
703 self.session_meta().map(|s| s.user_id.as_ref())
704 }
705
706 /// Get the device ID that identifies the current session.
707 pub fn device_id(&self) -> Option<&DeviceId> {
708 self.session_meta().map(|s| s.device_id.as_ref())
709 }
710
711 /// Get the current access token for this session.
712 ///
713 /// Will be `None` if the client has not been logged in.
714 pub fn access_token(&self) -> Option<String> {
715 self.auth_ctx().access_token()
716 }
717
718 /// Get the current tokens for this session.
719 ///
720 /// To be notified of changes in the session tokens, use
721 /// [`Client::subscribe_to_session_changes()`] or
722 /// [`Client::set_session_callbacks()`].
723 ///
724 /// Returns `None` if the client has not been logged in.
725 pub fn session_tokens(&self) -> Option<SessionTokens> {
726 self.auth_ctx().session_tokens()
727 }
728
729 /// Access the authentication API used to log in this client.
730 ///
731 /// Will be `None` if the client has not been logged in.
732 pub fn auth_api(&self) -> Option<AuthApi> {
733 match self.auth_ctx().auth_data.get()? {
734 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
735 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
736 }
737 }
738
739 /// Get the whole session info of this client.
740 ///
741 /// Will be `None` if the client has not been logged in.
742 ///
743 /// Can be used with [`Client::restore_session`] to restore a previously
744 /// logged-in session.
745 pub fn session(&self) -> Option<AuthSession> {
746 match self.auth_api()? {
747 AuthApi::Matrix(api) => api.session().map(Into::into),
748 AuthApi::OAuth(api) => api.full_session().map(Into::into),
749 }
750 }
751
752 /// Get a reference to the state store.
753 pub fn state_store(&self) -> &DynStateStore {
754 self.base_client().state_store()
755 }
756
757 /// Get a reference to the event cache store.
758 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
759 self.base_client().event_cache_store()
760 }
761
762 /// Get a reference to the media store.
763 pub fn media_store(&self) -> &MediaStoreLock {
764 self.base_client().media_store()
765 }
766
767 /// Access the native Matrix authentication API with this client.
768 pub fn matrix_auth(&self) -> MatrixAuth {
769 MatrixAuth::new(self.clone())
770 }
771
772 /// Get the account of the current owner of the client.
773 pub fn account(&self) -> Account {
774 Account::new(self.clone())
775 }
776
777 /// Get the encryption manager of the client.
778 #[cfg(feature = "e2e-encryption")]
779 pub fn encryption(&self) -> Encryption {
780 Encryption::new(self.clone())
781 }
782
783 /// Get the media manager of the client.
784 pub fn media(&self) -> Media {
785 Media::new(self.clone())
786 }
787
788 /// Get the pusher manager of the client.
789 pub fn pusher(&self) -> Pusher {
790 Pusher::new(self.clone())
791 }
792
793 /// Access the OAuth 2.0 API of the client.
794 pub fn oauth(&self) -> OAuth {
795 OAuth::new(self.clone())
796 }
797
798 /// Register a handler for a specific event type.
799 ///
800 /// The handler is a function or closure with one or more arguments. The
801 /// first argument is the event itself. All additional arguments are
802 /// "context" arguments: They have to implement [`EventHandlerContext`].
803 /// This trait is named that way because most of the types implementing it
804 /// give additional context about an event: The room it was in, its raw form
805 /// and other similar things. As two exceptions to this,
806 /// [`Client`] and [`EventHandlerHandle`] also implement the
807 /// `EventHandlerContext` trait so you don't have to clone your client
808 /// into the event handler manually and a handler can decide to remove
809 /// itself.
810 ///
811 /// Some context arguments are not universally applicable. A context
812 /// argument that isn't available for the given event type will result in
813 /// the event handler being skipped and an error being logged. The following
814 /// context argument types are only available for a subset of event types:
815 ///
816 /// * [`Room`] is only available for room-specific events, i.e. not for
817 /// events like global account data events or presence events.
818 ///
819 /// You can provide custom context via
820 /// [`add_event_handler_context`](Client::add_event_handler_context) and
821 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
822 /// into the event handler.
823 ///
824 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
825 ///
826 /// # Examples
827 ///
828 /// ```no_run
829 /// use matrix_sdk::{
830 /// deserialized_responses::EncryptionInfo,
831 /// event_handler::Ctx,
832 /// ruma::{
833 /// events::{
834 /// macros::EventContent,
835 /// push_rules::PushRulesEvent,
836 /// room::{
837 /// message::SyncRoomMessageEvent,
838 /// topic::SyncRoomTopicEvent,
839 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
840 /// },
841 /// },
842 /// push::Action,
843 /// Int, MilliSecondsSinceUnixEpoch,
844 /// },
845 /// Client, Room,
846 /// };
847 /// use serde::{Deserialize, Serialize};
848 ///
849 /// # async fn example(client: Client) {
850 /// client.add_event_handler(
851 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
852 /// // Common usage: Room event plus room and client.
853 /// },
854 /// );
855 /// client.add_event_handler(
856 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
857 /// async move {
858 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
859 /// // unencrypted events and events that were decrypted by the SDK.
860 /// }
861 /// },
862 /// );
863 /// client.add_event_handler(
864 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
865 /// async move {
866 /// // A `Vec<Action>` parameter allows you to know which push actions
867 /// // are applicable for an event. For example, an event with
868 /// // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
869 /// // should be highlighted in the timeline.
870 /// }
871 /// },
872 /// );
873 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
874 /// // You can omit any or all arguments after the first.
875 /// });
876 ///
877 /// // Registering a temporary event handler:
878 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
879 /// /* Event handler */
880 /// });
881 /// client.remove_event_handler(handle);
882 ///
883 /// // Registering custom event handler context:
884 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
885 /// struct MyContext {
886 /// number: usize,
887 /// }
888 /// client.add_event_handler_context(MyContext { number: 5 });
889 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
890 /// // Use the context
891 /// });
892 ///
893 /// // This will handle membership events in joined rooms. Invites are special, see below.
894 /// client.add_event_handler(
895 /// |ev: SyncRoomMemberEvent| async move {},
896 /// );
897 ///
898 /// // To handle state events in invited rooms (including invite membership events),
899 /// // `StrippedRoomMemberEvent` should be used.
900 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
901 /// client.add_event_handler(
902 /// |ev: StrippedRoomMemberEvent| async move {},
903 /// );
904 ///
905 /// // Custom events work exactly the same way, you just need to declare
906 /// // the content struct and use the EventContent derive macro on it.
907 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
908 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
909 /// struct TokenEventContent {
910 /// token: String,
911 /// #[serde(rename = "exp")]
912 /// expires_at: MilliSecondsSinceUnixEpoch,
913 /// }
914 ///
915 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
916 /// todo!("Display the token");
917 /// });
918 ///
919 /// // Event handler closures can also capture local variables.
920 /// // Make sure they are cheap to clone though, because they will be cloned
921 /// // every time the closure is called.
922 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
923 ///
924 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
925 /// println!("Calling the handler with identifier {data}");
926 /// });
927 /// # }
928 /// ```
929 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
930 where
931 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
932 H: EventHandler<Ev, Ctx>,
933 {
934 self.add_event_handler_impl(handler, None)
935 }
936
937 /// Register a handler for a specific room, and event type.
938 ///
939 /// This method works the same way as
940 /// [`add_event_handler`][Self::add_event_handler], except that the handler
941 /// will only be called for events in the room with the specified ID. See
942 /// that method for more details on event handler functions.
943 ///
944 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
945 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
946 /// your use case.
947 pub fn add_room_event_handler<Ev, Ctx, H>(
948 &self,
949 room_id: &RoomId,
950 handler: H,
951 ) -> EventHandlerHandle
952 where
953 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
954 H: EventHandler<Ev, Ctx>,
955 {
956 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
957 }
958
959 /// Observe a specific event type.
960 ///
961 /// `Ev` represents the kind of event that will be observed. `Ctx`
962 /// represents the context that will come with the event. It relies on the
963 /// same mechanism as [`Client::add_event_handler`]. The main difference is
964 /// that it returns an [`ObservableEventHandler`] and doesn't require a
965 /// user-defined closure. It is possible to subscribe to the
966 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
967 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
968 /// Ctx)`.
969 ///
970 /// Be careful that only the most recent value can be observed. Subscribers
971 /// are notified when a new value is sent, but there is no guarantee
972 /// that they will see all values.
973 ///
974 /// # Example
975 ///
976 /// Let's see a classical usage:
977 ///
978 /// ```
979 /// use futures_util::StreamExt as _;
980 /// use matrix_sdk::{
981 /// Client, Room,
982 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
983 /// };
984 ///
985 /// # async fn example(client: Client) -> Option<()> {
986 /// let observer =
987 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
988 ///
989 /// let mut subscriber = observer.subscribe();
990 ///
991 /// let (event, (room, push_actions)) = subscriber.next().await?;
992 /// # Some(())
993 /// # }
994 /// ```
995 ///
996 /// Now let's see how to get several contexts that can be useful for you:
997 ///
998 /// ```
999 /// use matrix_sdk::{
1000 /// Client, Room,
1001 /// deserialized_responses::EncryptionInfo,
1002 /// ruma::{
1003 /// events::room::{
1004 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1005 /// },
1006 /// push::Action,
1007 /// },
1008 /// };
1009 ///
1010 /// # async fn example(client: Client) {
1011 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1012 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1013 ///
1014 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1015 /// // to distinguish between unencrypted events and events that were decrypted
1016 /// // by the SDK.
1017 /// let _ = client
1018 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1019 /// );
1020 ///
1021 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1022 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1023 /// // should be highlighted in the timeline.
1024 /// let _ =
1025 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1026 ///
1027 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1028 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1029 /// # }
1030 /// ```
1031 ///
1032 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1033 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1034 where
1035 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1036 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1037 {
1038 self.observe_room_events_impl(None)
1039 }
1040
1041 /// Observe a specific room, and event type.
1042 ///
1043 /// This method works the same way as [`Client::observe_events`], except
1044 /// that the observability will only be applied for events in the room with
1045 /// the specified ID. See that method for more details.
1046 ///
1047 /// Be careful that only the most recent value can be observed. Subscribers
1048 /// are notified when a new value is sent, but there is no guarantee
1049 /// that they will see all values.
1050 pub fn observe_room_events<Ev, Ctx>(
1051 &self,
1052 room_id: &RoomId,
1053 ) -> ObservableEventHandler<(Ev, Ctx)>
1054 where
1055 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1056 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1057 {
1058 self.observe_room_events_impl(Some(room_id.to_owned()))
1059 }
1060
1061 /// Shared implementation for `Client::observe_events` and
1062 /// `Client::observe_room_events`.
1063 fn observe_room_events_impl<Ev, Ctx>(
1064 &self,
1065 room_id: Option<OwnedRoomId>,
1066 ) -> ObservableEventHandler<(Ev, Ctx)>
1067 where
1068 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1069 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1070 {
1071 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1072 // new value.
1073 let shared_observable = SharedObservable::new(None);
1074
1075 ObservableEventHandler::new(
1076 shared_observable.clone(),
1077 self.event_handler_drop_guard(self.add_event_handler_impl(
1078 move |event: Ev, context: Ctx| {
1079 shared_observable.set(Some((event, context)));
1080
1081 ready(())
1082 },
1083 room_id,
1084 )),
1085 )
1086 }
1087
1088 /// Subscribe to future `beacon_info` updates for the current user across
1089 /// all rooms.
1090 ///
1091 /// This stream is push-only: it emits only future updates observed during
1092 /// sync processing and does not replay existing state.
1093 pub fn observe_own_beacon_info_updates(
1094 &self,
1095 ) -> Result<impl Stream<Item = BeaconInfoUpdate> + use<>> {
1096 let observer = self.observe_events::<OriginalSyncBeaconInfoEvent, Room>();
1097 let mut stream = observer.subscribe();
1098 let own_user_id = self.user_id().ok_or(Error::AuthenticationRequired)?.to_owned();
1099 Ok(async_stream::stream! {
1100 let _observer = observer;
1101
1102 while let Some((event, room)) = stream.next().await {
1103 if event.state_key != own_user_id {
1104 continue;
1105 }
1106 yield BeaconInfoUpdate {
1107 room_id: room.room_id().to_owned(),
1108 event_id: event.event_id,
1109 content: event.content,
1110 };
1111 }
1112 })
1113 }
1114
1115 /// Remove the event handler associated with the handle.
1116 ///
1117 /// Note that you **must not** call `remove_event_handler` from the
1118 /// non-async part of an event handler, that is:
1119 ///
1120 /// ```ignore
1121 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1122 /// // ⚠ this will cause a deadlock ⚠
1123 /// client.remove_event_handler(handle);
1124 ///
1125 /// async move {
1126 /// // removing the event handler here is fine
1127 /// client.remove_event_handler(handle);
1128 /// }
1129 /// })
1130 /// ```
1131 ///
1132 /// Note also that handlers that remove themselves will still execute with
1133 /// events received in the same sync cycle.
1134 ///
1135 /// # Arguments
1136 ///
1137 /// `handle` - The [`EventHandlerHandle`] that is returned when
1138 /// registering the event handler with [`Client::add_event_handler`].
1139 ///
1140 /// # Examples
1141 ///
1142 /// ```no_run
1143 /// # use url::Url;
1144 /// # use tokio::sync::mpsc;
1145 /// #
1146 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1147 /// #
1148 /// use matrix_sdk::{
1149 /// Client, event_handler::EventHandlerHandle,
1150 /// ruma::events::room::member::SyncRoomMemberEvent,
1151 /// };
1152 /// #
1153 /// # futures_executor::block_on(async {
1154 /// # let client = matrix_sdk::Client::builder()
1155 /// # .homeserver_url(homeserver)
1156 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1157 /// # .build()
1158 /// # .await
1159 /// # .unwrap();
1160 ///
1161 /// client.add_event_handler(
1162 /// |ev: SyncRoomMemberEvent,
1163 /// client: Client,
1164 /// handle: EventHandlerHandle| async move {
1165 /// // Common usage: Check arriving Event is the expected one
1166 /// println!("Expected RoomMemberEvent received!");
1167 /// client.remove_event_handler(handle);
1168 /// },
1169 /// );
1170 /// # });
1171 /// ```
1172 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1173 self.inner.event_handlers.remove(handle);
1174 }
1175
1176 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1177 /// the given handle.
1178 ///
1179 /// When the returned value is dropped, the event handler will be removed.
1180 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1181 EventHandlerDropGuard::new(handle, self.clone())
1182 }
1183
1184 /// Add an arbitrary value for use as event handler context.
1185 ///
1186 /// The value can be obtained in an event handler by adding an argument of
1187 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1188 ///
1189 /// If a value of the same type has been added before, it will be
1190 /// overwritten.
1191 ///
1192 /// # Examples
1193 ///
1194 /// ```no_run
1195 /// use matrix_sdk::{
1196 /// Room, event_handler::Ctx,
1197 /// ruma::events::room::message::SyncRoomMessageEvent,
1198 /// };
1199 /// # #[derive(Clone)]
1200 /// # struct SomeType;
1201 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1202 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1203 /// # futures_executor::block_on(async {
1204 /// # let client = matrix_sdk::Client::builder()
1205 /// # .homeserver_url(homeserver)
1206 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1207 /// # .build()
1208 /// # .await
1209 /// # .unwrap();
1210 ///
1211 /// // Handle used to send messages to the UI part of the app
1212 /// let my_gui_handle: SomeType = obtain_gui_handle();
1213 ///
1214 /// client.add_event_handler_context(my_gui_handle.clone());
1215 /// client.add_event_handler(
1216 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1217 /// async move {
1218 /// // gui_handle.send(DisplayMessage { message: ev });
1219 /// }
1220 /// },
1221 /// );
1222 /// # });
1223 /// ```
1224 pub fn add_event_handler_context<T>(&self, ctx: T)
1225 where
1226 T: Clone + Send + Sync + 'static,
1227 {
1228 self.inner.event_handlers.add_context(ctx);
1229 }
1230
1231 /// Register a handler for a notification.
1232 ///
1233 /// Similar to [`Client::add_event_handler`], but only allows functions
1234 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1235 /// [`Client`] for now.
1236 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1237 where
1238 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1239 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1240 {
1241 self.inner.notification_handlers.write().await.push(Box::new(
1242 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1243 ));
1244
1245 self
1246 }
1247
1248 /// Subscribe to all updates for the room with the given ID.
1249 ///
1250 /// The returned receiver will receive a new message for each sync response
1251 /// that contains updates for that room.
1252 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1253 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1254 btree_map::Entry::Vacant(entry) => {
1255 let (tx, rx) = broadcast::channel(8);
1256 entry.insert(tx);
1257 rx
1258 }
1259 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1260 }
1261 }
1262
1263 /// Subscribe to all updates to all rooms, whenever any has been received in
1264 /// a sync response.
1265 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1266 self.inner.room_updates_sender.subscribe()
1267 }
1268
1269 pub(crate) async fn notification_handlers(
1270 &self,
1271 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1272 self.inner.notification_handlers.read().await
1273 }
1274
1275 /// Get all the rooms the client knows about.
1276 ///
1277 /// This will return the list of joined, invited, and left rooms.
1278 pub fn rooms(&self) -> Vec<Room> {
1279 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1280 }
1281
1282 /// Get all the rooms the client knows about, filtered by room state.
1283 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1284 self.base_client()
1285 .rooms_filtered(filter)
1286 .into_iter()
1287 .map(|room| Room::new(self.clone(), room))
1288 .collect()
1289 }
1290
1291 /// Get a stream of all the rooms, in addition to the existing rooms.
1292 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1293 let (rooms, stream) = self.base_client().rooms_stream();
1294
1295 let map_room = |room| Room::new(self.clone(), room);
1296
1297 (
1298 rooms.into_iter().map(map_room).collect(),
1299 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1300 )
1301 }
1302
1303 /// Returns the joined rooms this client knows about.
1304 pub fn joined_rooms(&self) -> Vec<Room> {
1305 self.rooms_filtered(RoomStateFilter::JOINED)
1306 }
1307
1308 /// Returns the invited rooms this client knows about.
1309 pub fn invited_rooms(&self) -> Vec<Room> {
1310 self.rooms_filtered(RoomStateFilter::INVITED)
1311 }
1312
1313 /// Returns the left rooms this client knows about.
1314 pub fn left_rooms(&self) -> Vec<Room> {
1315 self.rooms_filtered(RoomStateFilter::LEFT)
1316 }
1317
1318 /// Returns the joined space rooms this client knows about.
1319 pub fn joined_space_rooms(&self) -> Vec<Room> {
1320 self.base_client()
1321 .rooms_filtered(RoomStateFilter::JOINED)
1322 .into_iter()
1323 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1324 .collect()
1325 }
1326
1327 /// Get a room with the given room id.
1328 ///
1329 /// # Arguments
1330 ///
1331 /// `room_id` - The unique id of the room that should be fetched.
1332 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1333 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1334 }
1335
1336 /// Gets the preview of a room, whether the current user has joined it or
1337 /// not.
1338 pub async fn get_room_preview(
1339 &self,
1340 room_or_alias_id: &RoomOrAliasId,
1341 via: Vec<OwnedServerName>,
1342 ) -> Result<RoomPreview> {
1343 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1344 Ok(room_id) => room_id.to_owned(),
1345 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1346 };
1347
1348 if let Some(room) = self.get_room(&room_id) {
1349 // The cached data can only be trusted if the room state is joined or
1350 // banned: for invite and knock rooms, no updates will be received
1351 // for the rooms after the invite/knock action took place so we may
1352 // have very out to date data for important fields such as
1353 // `join_rule`. For left rooms, the homeserver should return the latest info.
1354 match room.state() {
1355 RoomState::Joined | RoomState::Banned => {
1356 return Ok(RoomPreview::from_known_room(&room).await);
1357 }
1358 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1359 }
1360 }
1361
1362 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1363 }
1364
1365 /// Resolve a room alias to a room id and a list of servers which know
1366 /// about it.
1367 ///
1368 /// # Arguments
1369 ///
1370 /// `room_alias` - The room alias to be resolved.
1371 pub async fn resolve_room_alias(
1372 &self,
1373 room_alias: &RoomAliasId,
1374 ) -> HttpResult<get_alias::v3::Response> {
1375 let request = get_alias::v3::Request::new(room_alias.to_owned());
1376 self.send(request).await
1377 }
1378
1379 /// Checks if a room alias is not in use yet.
1380 ///
1381 /// Returns:
1382 /// - `Ok(true)` if the room alias is available.
1383 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1384 /// status code).
1385 /// - An `Err` otherwise.
1386 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1387 match self.resolve_room_alias(alias).await {
1388 // The room alias was resolved, so it's already in use.
1389 Ok(_) => Ok(false),
1390 Err(error) => {
1391 match error.client_api_error_kind() {
1392 // The room alias wasn't found, so it's available.
1393 Some(ErrorKind::NotFound) => Ok(true),
1394 _ => Err(error),
1395 }
1396 }
1397 }
1398 }
1399
1400 /// Adds a new room alias associated with a room to the room directory.
1401 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1402 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1403 self.send(request).await?;
1404 Ok(())
1405 }
1406
1407 /// Removes a room alias from the room directory.
1408 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1409 let request = delete_alias::v3::Request::new(alias.to_owned());
1410 self.send(request).await?;
1411 Ok(())
1412 }
1413
1414 /// Update the homeserver from the login response well-known if needed.
1415 ///
1416 /// # Arguments
1417 ///
1418 /// * `login_well_known` - The `well_known` field from a successful login
1419 /// response.
1420 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1421 if self.inner.respect_login_well_known
1422 && let Some(well_known) = login_well_known
1423 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1424 {
1425 self.set_homeserver(homeserver);
1426 }
1427 }
1428
1429 /// Similar to [`Client::restore_session_with`], with
1430 /// [`RoomLoadSettings::default()`].
1431 ///
1432 /// # Panics
1433 ///
1434 /// Panics if a session was already restored or logged in.
1435 #[instrument(skip_all)]
1436 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1437 self.restore_session_with(session, RoomLoadSettings::default()).await
1438 }
1439
1440 /// Restore a session previously logged-in using one of the available
1441 /// authentication APIs. The number of rooms to restore is controlled by
1442 /// [`RoomLoadSettings`].
1443 ///
1444 /// See the documentation of the corresponding authentication API's
1445 /// `restore_session` method for more information.
1446 ///
1447 /// # Panics
1448 ///
1449 /// Panics if a session was already restored or logged in.
1450 #[instrument(skip_all)]
1451 pub async fn restore_session_with(
1452 &self,
1453 session: impl Into<AuthSession>,
1454 room_load_settings: RoomLoadSettings,
1455 ) -> Result<()> {
1456 let session = session.into();
1457 match session {
1458 AuthSession::Matrix(session) => {
1459 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1460 }
1461 AuthSession::OAuth(session) => {
1462 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1463 }
1464 }
1465 }
1466
1467 /// Refresh the access token using the authentication API used to log into
1468 /// this session.
1469 ///
1470 /// See the documentation of the authentication API's `refresh_access_token`
1471 /// method for more information.
1472 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1473 let Some(auth_api) = self.auth_api() else {
1474 return Err(RefreshTokenError::RefreshTokenRequired);
1475 };
1476
1477 match auth_api {
1478 AuthApi::Matrix(api) => {
1479 trace!("Token refresh: Using the homeserver.");
1480 Box::pin(api.refresh_access_token()).await?;
1481 }
1482 AuthApi::OAuth(api) => {
1483 trace!("Token refresh: Using OAuth 2.0.");
1484 Box::pin(api.refresh_access_token()).await?;
1485 }
1486 }
1487
1488 Ok(())
1489 }
1490
1491 /// Log out the current session using the proper authentication API.
1492 ///
1493 /// # Errors
1494 ///
1495 /// Returns an error if the session is not authenticated or if an error
1496 /// occurred while making the request to the server.
1497 pub async fn logout(&self) -> Result<(), Error> {
1498 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1499 match auth_api {
1500 AuthApi::Matrix(matrix_auth) => {
1501 matrix_auth.logout().await?;
1502 Ok(())
1503 }
1504 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1505 }
1506 }
1507
1508 /// Get or upload a sync filter.
1509 ///
1510 /// This method will either get a filter ID from the store or upload the
1511 /// filter definition to the homeserver and return the new filter ID.
1512 ///
1513 /// # Arguments
1514 ///
1515 /// * `filter_name` - The unique name of the filter, this name will be used
1516 /// locally to store and identify the filter ID returned by the server.
1517 ///
1518 /// * `definition` - The filter definition that should be uploaded to the
1519 /// server if no filter ID can be found in the store.
1520 ///
1521 /// # Examples
1522 ///
1523 /// ```no_run
1524 /// # use matrix_sdk::{
1525 /// # Client, config::SyncSettings,
1526 /// # ruma::api::client::{
1527 /// # filter::{
1528 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1529 /// # },
1530 /// # sync::sync_events::v3::Filter,
1531 /// # }
1532 /// # };
1533 /// # use url::Url;
1534 /// # async {
1535 /// # let homeserver = Url::parse("http://example.com").unwrap();
1536 /// # let client = Client::new(homeserver).await.unwrap();
1537 /// let mut filter = FilterDefinition::default();
1538 ///
1539 /// // Let's enable member lazy loading.
1540 /// filter.room.state.lazy_load_options =
1541 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1542 ///
1543 /// let filter_id = client
1544 /// .get_or_upload_filter("sync", filter)
1545 /// .await
1546 /// .unwrap();
1547 ///
1548 /// let sync_settings = SyncSettings::new()
1549 /// .filter(Filter::FilterId(filter_id));
1550 ///
1551 /// let response = client.sync_once(sync_settings).await.unwrap();
1552 /// # };
1553 #[instrument(skip(self, definition))]
1554 pub async fn get_or_upload_filter(
1555 &self,
1556 filter_name: &str,
1557 definition: FilterDefinition,
1558 ) -> Result<String> {
1559 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1560 debug!("Found filter locally");
1561 Ok(filter)
1562 } else {
1563 debug!("Didn't find filter locally");
1564 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1565 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1566 let response = self.send(request).await?;
1567
1568 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1569
1570 Ok(response.filter_id)
1571 }
1572 }
1573
1574 /// Prepare to join a room by ID, by getting the current details about it
1575 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1576 let room = self.get_room(room_id)?;
1577
1578 let inviter = match room.invite_details().await {
1579 Ok(details) => details.inviter,
1580 Err(Error::WrongRoomState(_)) => None,
1581 Err(e) => {
1582 warn!("Error fetching invite details for room: {e:?}");
1583 None
1584 }
1585 };
1586
1587 Some(PreJoinRoomInfo { inviter })
1588 }
1589
1590 /// Finish joining a room.
1591 ///
1592 /// If the room was an invite that should be marked as a DM, will include it
1593 /// in the DM event after creating the joined room.
1594 ///
1595 /// If encrypted history sharing is enabled, will check to see if we have a
1596 /// key bundle, and import it if so.
1597 ///
1598 /// # Arguments
1599 ///
1600 /// * `room_id` - The `RoomId` of the room that was joined.
1601 /// * `pre_join_room_info` - Information about the room before we joined.
1602 async fn finish_join_room(
1603 &self,
1604 room_id: &RoomId,
1605 pre_join_room_info: Option<PreJoinRoomInfo>,
1606 ) -> Result<Room> {
1607 info!(?room_id, ?pre_join_room_info, "Completing room join");
1608 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1609 room.state() == RoomState::Invited
1610 && room.is_direct().await.unwrap_or_else(|e| {
1611 warn!(%room_id, "is_direct() failed: {e}");
1612 false
1613 })
1614 } else {
1615 false
1616 };
1617
1618 let base_room = self
1619 .base_client()
1620 .room_joined(
1621 room_id,
1622 pre_join_room_info
1623 .as_ref()
1624 .and_then(|info| info.inviter.as_ref())
1625 .map(|i| i.user_id().to_owned()),
1626 )
1627 .await?;
1628 let room = Room::new(self.clone(), base_room);
1629
1630 if mark_as_dm {
1631 room.set_is_direct(true).await?;
1632 }
1633
1634 // If we joined following an invite, check if we had previously received a key
1635 // bundle from the inviter, and import it if so.
1636 //
1637 // It's important that we only do this once `BaseClient::room_joined` has
1638 // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1639 // race.
1640 #[cfg(feature = "e2e-encryption")]
1641 if self.inner.enable_share_history_on_invite
1642 && let Some(inviter) =
1643 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1644 {
1645 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1646 .await?;
1647 }
1648
1649 // Suppress "unused variable" and "unused field" lints
1650 #[cfg(not(feature = "e2e-encryption"))]
1651 let _ = pre_join_room_info.map(|i| i.inviter);
1652
1653 Ok(room)
1654 }
1655
1656 /// Join a room by `RoomId`.
1657 ///
1658 /// Returns the `Room` in the joined state.
1659 ///
1660 /// # Arguments
1661 ///
1662 /// * `room_id` - The `RoomId` of the room to be joined.
1663 #[instrument(skip(self))]
1664 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1665 // See who invited us to this room, if anyone. Note we have to do this before
1666 // making the `/join` request, otherwise we could race against the sync.
1667 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1668
1669 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1670 let response = self.send(request).await?;
1671 self.finish_join_room(&response.room_id, pre_join_info).await
1672 }
1673
1674 /// Join a room by `RoomOrAliasId`.
1675 ///
1676 /// Returns the `Room` in the joined state.
1677 ///
1678 /// # Arguments
1679 ///
1680 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1681 /// alias looks like `#name:example.com`.
1682 /// * `server_names` - The server names to be used for resolving the alias,
1683 /// if needs be.
1684 #[instrument(skip(self))]
1685 pub async fn join_room_by_id_or_alias(
1686 &self,
1687 alias: &RoomOrAliasId,
1688 server_names: &[OwnedServerName],
1689 ) -> Result<Room> {
1690 let pre_join_info = {
1691 match alias.try_into() {
1692 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1693 Err(_) => {
1694 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1695 // responding to an invitation to the room, and therefore don't need to handle
1696 // things that happen as a result of invites.
1697 None
1698 }
1699 }
1700 };
1701 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1702 via: server_names.to_owned(),
1703 });
1704 let response = self.send(request).await?;
1705 self.finish_join_room(&response.room_id, pre_join_info).await
1706 }
1707
1708 /// Search the homeserver's directory of public rooms.
1709 ///
1710 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1711 /// a `get_public_rooms::Response`.
1712 ///
1713 /// # Arguments
1714 ///
1715 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1716 ///
1717 /// * `since` - Pagination token from a previous request.
1718 ///
1719 /// * `server` - The name of the server, if `None` the requested server is
1720 /// used.
1721 ///
1722 /// # Examples
1723 /// ```no_run
1724 /// use matrix_sdk::Client;
1725 /// # use url::Url;
1726 /// # let homeserver = Url::parse("http://example.com").unwrap();
1727 /// # let limit = Some(10);
1728 /// # let since = Some("since token");
1729 /// # let server = Some("servername.com".try_into().unwrap());
1730 /// # async {
1731 /// let mut client = Client::new(homeserver).await.unwrap();
1732 ///
1733 /// client.public_rooms(limit, since, server).await;
1734 /// # };
1735 /// ```
1736 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1737 pub async fn public_rooms(
1738 &self,
1739 limit: Option<u32>,
1740 since: Option<&str>,
1741 server: Option<&ServerName>,
1742 ) -> HttpResult<get_public_rooms::v3::Response> {
1743 let limit = limit.map(UInt::from);
1744
1745 let request = assign!(get_public_rooms::v3::Request::new(), {
1746 limit,
1747 since: since.map(ToOwned::to_owned),
1748 server: server.map(ToOwned::to_owned),
1749 });
1750 self.send(request).await
1751 }
1752
1753 /// Create a room with the given parameters.
1754 ///
1755 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1756 /// created room.
1757 ///
1758 /// If you want to create a direct message with one specific user, you can
1759 /// use [`create_dm`][Self::create_dm], which is more convenient than
1760 /// assembling the [`create_room::v3::Request`] yourself.
1761 ///
1762 /// If the `is_direct` field of the request is set to `true` and at least
1763 /// one user is invited, the room will be automatically added to the direct
1764 /// rooms in the account data.
1765 ///
1766 /// # Examples
1767 ///
1768 /// ```no_run
1769 /// use matrix_sdk::{
1770 /// Client,
1771 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1772 /// };
1773 /// # use url::Url;
1774 /// #
1775 /// # async {
1776 /// # let homeserver = Url::parse("http://example.com").unwrap();
1777 /// let request = CreateRoomRequest::new();
1778 /// let client = Client::new(homeserver).await.unwrap();
1779 /// assert!(client.create_room(request).await.is_ok());
1780 /// # };
1781 /// ```
1782 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1783 let invite = request.invite.clone();
1784 let is_direct_room = request.is_direct;
1785 let response = self.send(request).await?;
1786
1787 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1788
1789 let joined_room = Room::new(self.clone(), base_room);
1790
1791 if is_direct_room
1792 && !invite.is_empty()
1793 && let Err(error) =
1794 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1795 {
1796 // FIXME: Retry in the background
1797 error!("Failed to mark room as DM: {error}");
1798 }
1799
1800 Ok(joined_room)
1801 }
1802
1803 /// Create a DM room.
1804 ///
1805 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1806 /// given user being invited, the room marked `is_direct` and both the
1807 /// creator and invitee getting the default maximum power level.
1808 ///
1809 /// If the `e2e-encryption` feature is enabled, the room will also be
1810 /// encrypted.
1811 ///
1812 /// # Arguments
1813 ///
1814 /// * `user_id` - The ID of the user to create a DM for.
1815 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1816 #[cfg(feature = "e2e-encryption")]
1817 let initial_state = vec![
1818 InitialStateEvent::with_empty_state_key(
1819 RoomEncryptionEventContent::with_recommended_defaults(),
1820 )
1821 .to_raw_any(),
1822 ];
1823
1824 #[cfg(not(feature = "e2e-encryption"))]
1825 let initial_state = vec![];
1826
1827 let request = assign!(create_room::v3::Request::new(), {
1828 invite: vec![user_id.to_owned()],
1829 is_direct: true,
1830 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1831 initial_state,
1832 });
1833
1834 self.create_room(request).await
1835 }
1836
1837 /// Get the first existing DM room with the given user, if any.
1838 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1839 self.get_dm_rooms(user_id).next()
1840 }
1841
1842 /// Get an iterator with the existing DM rooms for the given user.
1843 pub fn get_dm_rooms(&self, user_id: &UserId) -> impl Iterator<Item = Room> {
1844 let rooms = self.joined_rooms();
1845
1846 let dm_definition = &self.base_client().dm_room_definition;
1847
1848 // Find the room we share with the `user_id` and only with `user_id`
1849 let rooms = rooms.into_iter().filter(move |r| {
1850 let targets = r.direct_targets();
1851 let targets_match =
1852 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id));
1853 match dm_definition {
1854 DmRoomDefinition::MatrixSpec => targets_match,
1855 DmRoomDefinition::TwoMembers => {
1856 let service_members_count =
1857 r.service_members().map(|s| s.len()).unwrap_or_default() as u64;
1858 let active_non_service_members =
1859 r.active_members_count().saturating_sub(service_members_count);
1860 targets_match && active_non_service_members <= 2
1861 }
1862 }
1863 });
1864
1865 trace!(?user_id, ?rooms, "Found DM rooms with user");
1866 rooms
1867 }
1868
1869 /// Search the homeserver's directory for public rooms with a filter.
1870 ///
1871 /// # Arguments
1872 ///
1873 /// * `room_search` - The easiest way to create this request is using the
1874 /// `get_public_rooms_filtered::Request` itself.
1875 ///
1876 /// # Examples
1877 ///
1878 /// ```no_run
1879 /// # use url::Url;
1880 /// # use matrix_sdk::Client;
1881 /// # async {
1882 /// # let homeserver = Url::parse("http://example.com")?;
1883 /// use matrix_sdk::ruma::{
1884 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1885 /// };
1886 /// # let mut client = Client::new(homeserver).await?;
1887 ///
1888 /// let mut filter = Filter::new();
1889 /// filter.generic_search_term = Some("rust".to_owned());
1890 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1891 /// request.filter = filter;
1892 ///
1893 /// let response = client.public_rooms_filtered(request).await?;
1894 ///
1895 /// for room in response.chunk {
1896 /// println!("Found room {room:?}");
1897 /// }
1898 /// # anyhow::Ok(()) };
1899 /// ```
1900 pub async fn public_rooms_filtered(
1901 &self,
1902 request: get_public_rooms_filtered::v3::Request,
1903 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1904 self.send(request).await
1905 }
1906
1907 /// Send an arbitrary request to the server, without updating client state.
1908 ///
1909 /// **Warning:** Because this method *does not* update the client state, it
1910 /// is important to make sure that you account for this yourself, and
1911 /// use wrapper methods where available. This method should *only* be
1912 /// used if a wrapper method for the endpoint you'd like to use is not
1913 /// available.
1914 ///
1915 /// # Arguments
1916 ///
1917 /// * `request` - A filled out and valid request for the endpoint to be hit
1918 ///
1919 /// * `timeout` - An optional request timeout setting, this overrides the
1920 /// default request setting if one was set.
1921 ///
1922 /// # Examples
1923 ///
1924 /// ```no_run
1925 /// # use matrix_sdk::{Client, config::SyncSettings};
1926 /// # use url::Url;
1927 /// # async {
1928 /// # let homeserver = Url::parse("http://localhost:8080")?;
1929 /// # let mut client = Client::new(homeserver).await?;
1930 /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1931 ///
1932 /// // First construct the request you want to make
1933 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1934 /// // for all available Endpoints
1935 /// let user_id = owned_user_id!("@example:localhost");
1936 /// let request = profile::get_profile::v3::Request::new(user_id);
1937 ///
1938 /// // Start the request using Client::send()
1939 /// let response = client.send(request).await?;
1940 ///
1941 /// // Check the corresponding Response struct to find out what types are
1942 /// // returned
1943 /// # anyhow::Ok(()) };
1944 /// ```
1945 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1946 where
1947 Request: OutgoingRequest + Clone + Debug,
1948 Request::Authentication: SupportedAuthScheme,
1949 Request::PathBuilder: SupportedPathBuilder,
1950 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1951 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1952 {
1953 SendRequest {
1954 client: self.clone(),
1955 request,
1956 config: None,
1957 send_progress: Default::default(),
1958 }
1959 }
1960
1961 pub(crate) async fn send_inner<Request>(
1962 &self,
1963 request: Request,
1964 config: Option<RequestConfig>,
1965 send_progress: SharedObservable<TransmissionProgress>,
1966 ) -> HttpResult<Request::IncomingResponse>
1967 where
1968 Request: OutgoingRequest + Debug,
1969 Request::Authentication: SupportedAuthScheme,
1970 Request::PathBuilder: SupportedPathBuilder,
1971 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1972 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1973 {
1974 let homeserver = self.homeserver().to_string();
1975 let access_token = self.access_token();
1976 let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1977
1978 let path_builder_input =
1979 Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1980
1981 let result = self
1982 .inner
1983 .http_client
1984 .send(
1985 request,
1986 config,
1987 homeserver,
1988 access_token.as_deref(),
1989 path_builder_input,
1990 send_progress,
1991 )
1992 .await;
1993
1994 if let Err(Some(ErrorKind::UnknownToken { .. })) =
1995 result.as_ref().map_err(HttpError::client_api_error_kind)
1996 && let Some(access_token) = &access_token
1997 {
1998 // Mark the access token as expired.
1999 self.auth_ctx().set_access_token_expired(access_token);
2000 }
2001
2002 result
2003 }
2004
2005 fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
2006 _ = self
2007 .inner
2008 .auth_ctx
2009 .session_change_sender
2010 .send(SessionChange::UnknownToken(unknown_token_data.clone()));
2011 }
2012
2013 /// Fetches server versions from network; no caching.
2014 pub async fn fetch_server_versions(
2015 &self,
2016 request_config: Option<RequestConfig>,
2017 ) -> HttpResult<get_supported_versions::Response> {
2018 // Since this was called by the user, try to refresh the access token if
2019 // necessary.
2020 self.fetch_server_versions_inner(false, request_config).await
2021 }
2022
2023 /// Fetches server versions from network; no caching.
2024 ///
2025 /// If the access token is expired and `failsafe` is `false`, this will
2026 /// attempt to refresh the access token, otherwise this will try to make an
2027 /// unauthenticated request instead.
2028 pub(crate) async fn fetch_server_versions_inner(
2029 &self,
2030 failsafe: bool,
2031 request_config: Option<RequestConfig>,
2032 ) -> HttpResult<get_supported_versions::Response> {
2033 if !failsafe {
2034 // `Client::send()` handles refreshing access tokens.
2035 return self
2036 .send(get_supported_versions::Request::new())
2037 .with_request_config(request_config)
2038 .await;
2039 }
2040
2041 let homeserver = self.homeserver().to_string();
2042
2043 // If we have a fresh access token, try with it first.
2044 if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2045 && self.auth_ctx().has_valid_access_token()
2046 && let Some(access_token) = self.access_token()
2047 {
2048 let result = self
2049 .inner
2050 .http_client
2051 .send(
2052 get_supported_versions::Request::new(),
2053 request_config,
2054 homeserver.clone(),
2055 Some(&access_token),
2056 (),
2057 Default::default(),
2058 )
2059 .await;
2060
2061 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2062 result.as_ref().map_err(HttpError::client_api_error_kind)
2063 {
2064 // If the access token is actually expired, mark it as expired and fallback to
2065 // the unauthenticated request below.
2066 self.auth_ctx().set_access_token_expired(&access_token);
2067 } else {
2068 // If the request succeeded or it's an other error, just stop now.
2069 return result;
2070 }
2071 }
2072
2073 // Try without authentication.
2074 self.inner
2075 .http_client
2076 .send(
2077 get_supported_versions::Request::new(),
2078 request_config,
2079 homeserver.clone(),
2080 None,
2081 (),
2082 Default::default(),
2083 )
2084 .await
2085 }
2086
2087 /// Fetches client well_known from network; no caching.
2088 ///
2089 /// 1. If the [`Client::server`] value is available, we use it to fetch the
2090 /// well-known contents.
2091 /// 2. If it's not, we try extracting the server name from the
2092 /// [`Client::user_id`] and building the server URL from it.
2093 /// 3. If we couldn't get the well-known contents with either the explicit
2094 /// server name or the implicit extracted one, we try the homeserver URL
2095 /// as a last resort.
2096 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2097 let homeserver = self.homeserver();
2098 let scheme = homeserver.scheme();
2099
2100 // Use the server name, either an explicit one or an implicit one taken from
2101 // the user id: sometimes we'll have only the homeserver url available and no
2102 // server name, but the server name can be extracted from the current user id.
2103 let server_url = self
2104 .server()
2105 .map(|server| server.to_string())
2106 // If the server name wasn't available, extract it from the user id and build a URL:
2107 // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2108 // will be the same for the public server url, lacking a better candidate.
2109 .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2110
2111 // If the server name is available, first try using it
2112 let response = if let Some(server_url) = server_url {
2113 // First try using the server name
2114 self.fetch_client_well_known_with_url(server_url).await
2115 } else {
2116 None
2117 };
2118
2119 // If we didn't get a well-known value yet, try with the homeserver url instead:
2120 if response.is_none() {
2121 // Sometimes people configure their well-known directly on the homeserver so use
2122 // this as a fallback when the server name is unknown.
2123 warn!(
2124 "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2125 );
2126 self.fetch_client_well_known_with_url(homeserver.to_string()).await
2127 } else {
2128 response
2129 }
2130 }
2131
2132 async fn fetch_client_well_known_with_url(
2133 &self,
2134 url: String,
2135 ) -> Option<discover_homeserver::Response> {
2136 let well_known = self
2137 .inner
2138 .http_client
2139 .send(
2140 discover_homeserver::Request::new(),
2141 Some(RequestConfig::short_retry()),
2142 url,
2143 None,
2144 (),
2145 Default::default(),
2146 )
2147 .await;
2148
2149 match well_known {
2150 Ok(well_known) => Some(well_known),
2151 Err(http_error) => {
2152 // It is perfectly valid to not have a well-known file.
2153 // Maybe we should check for a specific error code to be sure?
2154 warn!("Failed to fetch client well-known: {http_error}");
2155 None
2156 }
2157 }
2158 }
2159
2160 /// Load supported versions from storage, or fetch them from network and
2161 /// cache them.
2162 ///
2163 /// If `failsafe` is true, this will try to minimize side effects to avoid
2164 /// possible deadlocks.
2165 async fn fetch_supported_versions(
2166 &self,
2167 failsafe: bool,
2168 ) -> HttpResult<SupportedVersionsResponse> {
2169 let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2170 let supported_versions = SupportedVersionsResponse {
2171 versions: server_versions.versions,
2172 unstable_features: server_versions.unstable_features,
2173 };
2174
2175 Ok(supported_versions)
2176 }
2177
2178 /// Get the Matrix versions and features supported by the homeserver by
2179 /// fetching them from the server or the cache.
2180 ///
2181 /// This is equivalent to calling both [`Client::server_versions()`] and
2182 /// [`Client::unstable_features()`]. To always fetch the result from the
2183 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2184 /// and then `.as_supported_versions()` on the response.
2185 ///
2186 /// # Examples
2187 ///
2188 /// ```no_run
2189 /// use ruma::api::{FeatureFlag, MatrixVersion};
2190 /// # use matrix_sdk::{Client, config::SyncSettings};
2191 /// # use url::Url;
2192 /// # async {
2193 /// # let homeserver = Url::parse("http://localhost:8080")?;
2194 /// # let mut client = Client::new(homeserver).await?;
2195 ///
2196 /// let supported = client.supported_versions().await?;
2197 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2198 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2199 ///
2200 /// let msc_x_feature = FeatureFlag::from("msc_x");
2201 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2202 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2203 /// # anyhow::Ok(()) };
2204 /// ```
2205 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2206 self.supported_versions_inner(false).await
2207 }
2208
2209 /// Get the Matrix versions and features supported by the homeserver by
2210 /// fetching them from the server or the cache.
2211 ///
2212 /// If `failsafe` is true, this will try to minimize side effects to avoid
2213 /// possible deadlocks.
2214 pub(crate) async fn supported_versions_inner(
2215 &self,
2216 failsafe: bool,
2217 ) -> HttpResult<SupportedVersions> {
2218 match self.supported_versions_cached_inner(failsafe).await {
2219 Ok(Some(value)) => {
2220 return Ok(value);
2221 }
2222 Ok(None) => {
2223 // The cache is empty, make a request.
2224 }
2225 Err(error) => {
2226 warn!("error when loading cached supported versions: {error}");
2227 // Fallthrough to make a request.
2228 }
2229 }
2230
2231 self.refresh_supported_versions_cache(failsafe).await
2232 }
2233
2234 /// Refresh the Matrix versions and features supported by the homeserver in
2235 /// the cache.
2236 ///
2237 /// If `failsafe` is true, this will try to minimize side effects to avoid
2238 /// possible deadlocks.
2239 async fn refresh_supported_versions_cache(
2240 &self,
2241 failsafe: bool,
2242 ) -> HttpResult<SupportedVersions> {
2243 let cached_supported_versions = &self.inner.caches.supported_versions;
2244
2245 let mut supported_versions_guard = match cached_supported_versions.refresh_lock.try_lock() {
2246 Ok(guard) => guard,
2247 Err(_) => {
2248 // There is already a refresh in progress, wait for it to finish.
2249 let guard = cached_supported_versions.refresh_lock.lock().await;
2250
2251 if let Err(error) = guard.as_ref() {
2252 // There was an error in the previous refresh, return it.
2253 return Err(HttpError::Cached(error.clone()));
2254 }
2255
2256 // Reuse the data if it was cached and it hasn't expired.
2257 if let CachedValue::Cached(value) = cached_supported_versions.value()
2258 && !value.has_expired()
2259 {
2260 return Ok(value.into_data());
2261 }
2262
2263 // The data wasn't cached or has expired, we need to make another request.
2264 guard
2265 }
2266 };
2267
2268 let response = match self.fetch_supported_versions(failsafe).await {
2269 Ok(response) => {
2270 *supported_versions_guard = Ok(());
2271 TtlValue::new(response)
2272 }
2273 Err(error) => {
2274 let error = Arc::new(error);
2275 *supported_versions_guard = Err(error.clone());
2276 return Err(HttpError::Cached(error));
2277 }
2278 };
2279
2280 let supported_versions = response.as_ref().map(|response| response.supported_versions());
2281
2282 // Only cache the result if the request was authenticated.
2283 if self.auth_ctx().has_valid_access_token() {
2284 if let Err(err) = self
2285 .state_store()
2286 .set_kv_data(
2287 StateStoreDataKey::SupportedVersions,
2288 StateStoreDataValue::SupportedVersions(response),
2289 )
2290 .await
2291 {
2292 warn!("error when caching supported versions: {err}");
2293 }
2294
2295 cached_supported_versions.set_value(supported_versions.clone());
2296 }
2297
2298 Ok(supported_versions.into_data())
2299 }
2300
2301 /// Get the Matrix versions and features supported by the homeserver by
2302 /// fetching them from the cache.
2303 ///
2304 /// For a version of this function that fetches the supported versions and
2305 /// features from the homeserver if the [`SupportedVersions`] aren't
2306 /// found in the cache, take a look at the [`Client::supported_versions()`]
2307 /// method.
2308 ///
2309 /// If the data in the cache has expired, this will trigger a background
2310 /// task to refresh it.
2311 ///
2312 /// # Examples
2313 ///
2314 /// ```no_run
2315 /// use ruma::api::{FeatureFlag, MatrixVersion};
2316 /// # use matrix_sdk::{Client, config::SyncSettings};
2317 /// # use url::Url;
2318 /// # async {
2319 /// # let homeserver = Url::parse("http://localhost:8080")?;
2320 /// # let mut client = Client::new(homeserver).await?;
2321 ///
2322 /// let supported =
2323 /// if let Some(supported) = client.supported_versions_cached().await? {
2324 /// supported
2325 /// } else {
2326 /// client.fetch_server_versions(None).await?.as_supported_versions()
2327 /// };
2328 ///
2329 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2330 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2331 ///
2332 /// let msc_x_feature = FeatureFlag::from("msc_x");
2333 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2334 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2335 /// # anyhow::Ok(()) };
2336 /// ```
2337 pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2338 self.supported_versions_cached_inner(false).await
2339 }
2340
2341 async fn supported_versions_cached_inner(
2342 &self,
2343 failsafe: bool,
2344 ) -> Result<Option<SupportedVersions>, StoreError> {
2345 let supported_versions_cache = &self.inner.caches.supported_versions;
2346
2347 let value = if let CachedValue::Cached(cached) = supported_versions_cache.value() {
2348 cached
2349 } else if let Some(stored) = self
2350 .state_store()
2351 .get_kv_data(StateStoreDataKey::SupportedVersions)
2352 .await?
2353 .and_then(|value| value.into_supported_versions())
2354 {
2355 let stored = stored.map(|response| response.supported_versions());
2356
2357 // Copy the data from the store in the in-memory cache.
2358 supported_versions_cache.set_value(stored.clone());
2359
2360 stored
2361 } else {
2362 return Ok(None);
2363 };
2364
2365 // Spawn a task to refresh the cache if it has expired and we have a valid
2366 // access token.
2367 if value.has_expired() && self.auth_ctx().has_valid_access_token() {
2368 debug!("spawning task to refresh supported versions cache");
2369
2370 let client = self.clone();
2371 self.task_monitor().spawn_finite_task("refresh supported versions cache", async move {
2372 if let Err(error) = client.refresh_supported_versions_cache(failsafe).await {
2373 warn!("failed to refresh supported versions cache: {error}");
2374 }
2375 });
2376 }
2377
2378 Ok(Some(value.into_data()))
2379 }
2380
2381 /// Get the Matrix versions supported by the homeserver by fetching them
2382 /// from the server or the cache.
2383 ///
2384 /// # Examples
2385 ///
2386 /// ```no_run
2387 /// use ruma::api::MatrixVersion;
2388 /// # use matrix_sdk::{Client, config::SyncSettings};
2389 /// # use url::Url;
2390 /// # async {
2391 /// # let homeserver = Url::parse("http://localhost:8080")?;
2392 /// # let mut client = Client::new(homeserver).await?;
2393 ///
2394 /// let server_versions = client.server_versions().await?;
2395 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2396 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2397 /// # anyhow::Ok(()) };
2398 /// ```
2399 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2400 Ok(self.supported_versions().await?.versions)
2401 }
2402
2403 /// Get the unstable features supported by the homeserver by fetching them
2404 /// from the server or the cache.
2405 ///
2406 /// # Examples
2407 ///
2408 /// ```no_run
2409 /// use matrix_sdk::ruma::api::FeatureFlag;
2410 /// # use matrix_sdk::{Client, config::SyncSettings};
2411 /// # use url::Url;
2412 /// # async {
2413 /// # let homeserver = Url::parse("http://localhost:8080")?;
2414 /// # let mut client = Client::new(homeserver).await?;
2415 ///
2416 /// let msc_x_feature = FeatureFlag::from("msc_x");
2417 /// let unstable_features = client.unstable_features().await?;
2418 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2419 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2420 /// # anyhow::Ok(()) };
2421 /// ```
2422 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2423 Ok(self.supported_versions().await?.features)
2424 }
2425
2426 /// Empty the supported versions and unstable features cache.
2427 ///
2428 /// Since the SDK caches the supported versions, it's possible to have a
2429 /// stale entry in the cache. This functions makes it possible to force
2430 /// reset it.
2431 pub async fn reset_supported_versions(&self) -> Result<()> {
2432 // Empty the in-memory cache.
2433 self.inner.caches.supported_versions.reset();
2434
2435 // Empty the store cache.
2436 Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2437 }
2438
2439 /// Get the well-known file of the homeserver from the cache.
2440 ///
2441 /// If the data in the cache has expired, this will trigger a background
2442 /// task to refresh it.
2443 async fn well_known_cached(
2444 &self,
2445 ) -> Result<CachedValue<Option<WellKnownResponse>>, StoreError> {
2446 let well_known_cache = &self.inner.caches.well_known;
2447
2448 let value = if let CachedValue::Cached(cached) = well_known_cache.value() {
2449 cached
2450 } else if let Some(stored) = self
2451 .state_store()
2452 .get_kv_data(StateStoreDataKey::WellKnown)
2453 .await?
2454 .and_then(|value| value.into_well_known())
2455 {
2456 // Copy the data from the store into the in-memory cache.
2457 well_known_cache.set_value(stored.clone());
2458
2459 stored
2460 } else {
2461 return Ok(CachedValue::NotSet);
2462 };
2463
2464 // Spawn a task to refresh the cache if it has expired.
2465 if value.has_expired() {
2466 debug!("spawning task to refresh well-known cache");
2467
2468 let client = self.clone();
2469 self.task_monitor().spawn_finite_task("refresh well-known cache", async move {
2470 client.refresh_well_known_cache().await;
2471 });
2472 }
2473
2474 Ok(CachedValue::Cached(value.into_data()))
2475 }
2476
2477 /// Refresh the well-known file of the homeserver in the cache.
2478 async fn refresh_well_known_cache(&self) -> Option<WellKnownResponse> {
2479 let well_known_cache = &self.inner.caches.well_known;
2480
2481 let _well_known_guard = match well_known_cache.refresh_lock.try_lock() {
2482 Ok(guard) => guard,
2483 Err(_) => {
2484 // There is already a refresh in progress, wait for it to finish.
2485 let guard = well_known_cache.refresh_lock.lock().await;
2486
2487 // A refresh can't fail because we ignore failures, so there shouldn't be an
2488 // error in the refresh lock.
2489
2490 // Reuse the data if it was cached and it hasn't expired.
2491 if let CachedValue::Cached(value) = well_known_cache.value()
2492 && !value.has_expired()
2493 {
2494 return value.into_data();
2495 }
2496
2497 // The data wasn't cached or has expired, we need to make another request.
2498 guard
2499 }
2500 };
2501
2502 let well_known = TtlValue::new(self.fetch_client_well_known().await.map(Into::into));
2503
2504 if let Err(err) = self
2505 .state_store()
2506 .set_kv_data(
2507 StateStoreDataKey::WellKnown,
2508 StateStoreDataValue::WellKnown(well_known.clone()),
2509 )
2510 .await
2511 {
2512 warn!("error when caching well-known: {err}");
2513 }
2514
2515 well_known_cache.set_value(well_known.clone());
2516
2517 well_known.into_data()
2518 }
2519
2520 /// Get the well-known file of the homeserver by fetching it from the server
2521 /// or the cache.
2522 async fn well_known(&self) -> Option<WellKnownResponse> {
2523 match self.well_known_cached().await {
2524 Ok(CachedValue::Cached(value)) => {
2525 return value;
2526 }
2527 Ok(CachedValue::NotSet) => {
2528 // The cache is empty, make a request.
2529 }
2530 Err(error) => {
2531 warn!("error when loading cached well-known: {error}");
2532 // Fallthrough to make a request.
2533 }
2534 }
2535
2536 self.refresh_well_known_cache().await
2537 }
2538
2539 /// Get information about the homeserver's advertised RTC foci by fetching
2540 /// the well-known file from the server or the cache.
2541 ///
2542 /// # Examples
2543 /// ```no_run
2544 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2545 /// # use url::Url;
2546 /// # async {
2547 /// # let homeserver = Url::parse("http://localhost:8080")?;
2548 /// # let mut client = Client::new(homeserver).await?;
2549 /// let rtc_foci = client.rtc_foci().await?;
2550 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2551 /// RtcFocusInfo::LiveKit(info) => Some(info),
2552 /// _ => None,
2553 /// });
2554 /// if let Some(info) = default_livekit_focus_info {
2555 /// println!("Default LiveKit service URL: {}", info.service_url);
2556 /// }
2557 /// # anyhow::Ok(()) };
2558 /// ```
2559 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2560 let well_known = self.well_known().await;
2561
2562 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2563 }
2564
2565 /// Empty the well-known cache.
2566 ///
2567 /// Since the SDK caches the well-known, it's possible to have a stale entry
2568 /// in the cache. This functions makes it possible to force reset it.
2569 pub async fn reset_well_known(&self) -> Result<()> {
2570 // Empty the in-memory caches.
2571 self.inner.caches.well_known.reset();
2572
2573 // Empty the store cache.
2574 Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2575 }
2576
2577 /// Check whether MSC 4028 is enabled on the homeserver.
2578 ///
2579 /// # Examples
2580 ///
2581 /// ```no_run
2582 /// # use matrix_sdk::{Client, config::SyncSettings};
2583 /// # use url::Url;
2584 /// # async {
2585 /// # let homeserver = Url::parse("http://localhost:8080")?;
2586 /// # let mut client = Client::new(homeserver).await?;
2587 /// let msc4028_enabled =
2588 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2589 /// # anyhow::Ok(()) };
2590 /// ```
2591 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2592 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2593 }
2594
2595 /// Get information of all our own devices.
2596 ///
2597 /// # Examples
2598 ///
2599 /// ```no_run
2600 /// # use matrix_sdk::{Client, config::SyncSettings};
2601 /// # use url::Url;
2602 /// # async {
2603 /// # let homeserver = Url::parse("http://localhost:8080")?;
2604 /// # let mut client = Client::new(homeserver).await?;
2605 /// let response = client.devices().await?;
2606 ///
2607 /// for device in response.devices {
2608 /// println!(
2609 /// "Device: {} {}",
2610 /// device.device_id,
2611 /// device.display_name.as_deref().unwrap_or("")
2612 /// );
2613 /// }
2614 /// # anyhow::Ok(()) };
2615 /// ```
2616 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2617 let request = get_devices::v3::Request::new();
2618
2619 self.send(request).await
2620 }
2621
2622 /// Delete the given devices from the server.
2623 ///
2624 /// # Arguments
2625 ///
2626 /// * `devices` - The list of devices that should be deleted from the
2627 /// server.
2628 ///
2629 /// * `auth_data` - This request requires user interactive auth, the first
2630 /// request needs to set this to `None` and will always fail with an
2631 /// `UiaaResponse`. The response will contain information for the
2632 /// interactive auth and the same request needs to be made but this time
2633 /// with some `auth_data` provided.
2634 ///
2635 /// ```no_run
2636 /// # use matrix_sdk::{
2637 /// # ruma::{api::client::uiaa, owned_device_id},
2638 /// # Client, Error, config::SyncSettings,
2639 /// # };
2640 /// # use serde_json::json;
2641 /// # use url::Url;
2642 /// # use std::collections::BTreeMap;
2643 /// # async {
2644 /// # let homeserver = Url::parse("http://localhost:8080")?;
2645 /// # let mut client = Client::new(homeserver).await?;
2646 /// let devices = &[owned_device_id!("DEVICEID")];
2647 ///
2648 /// if let Err(e) = client.delete_devices(devices, None).await {
2649 /// if let Some(info) = e.as_uiaa_response() {
2650 /// let mut password = uiaa::Password::new(
2651 /// uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2652 /// "wordpass".to_owned(),
2653 /// );
2654 /// password.session = info.session.clone();
2655 ///
2656 /// client
2657 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2658 /// .await?;
2659 /// }
2660 /// }
2661 /// # anyhow::Ok(()) };
2662 pub async fn delete_devices(
2663 &self,
2664 devices: &[OwnedDeviceId],
2665 auth_data: Option<uiaa::AuthData>,
2666 ) -> HttpResult<delete_devices::v3::Response> {
2667 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2668 request.auth = auth_data;
2669
2670 self.send(request).await
2671 }
2672
2673 /// Change the display name of a device owned by the current user.
2674 ///
2675 /// Returns a `update_device::Response` which specifies the result
2676 /// of the operation.
2677 ///
2678 /// # Arguments
2679 ///
2680 /// * `device_id` - The ID of the device to change the display name of.
2681 /// * `display_name` - The new display name to set.
2682 pub async fn rename_device(
2683 &self,
2684 device_id: &DeviceId,
2685 display_name: &str,
2686 ) -> HttpResult<update_device::v3::Response> {
2687 let mut request = update_device::v3::Request::new(device_id.to_owned());
2688 request.display_name = Some(display_name.to_owned());
2689
2690 self.send(request).await
2691 }
2692
2693 /// Check whether a device with a specific ID exists on the server.
2694 ///
2695 /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2696 /// with 404 and the underlying error otherwise.
2697 ///
2698 /// # Arguments
2699 ///
2700 /// * `device_id` - The ID of the device to query.
2701 pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2702 let request = device::get_device::v3::Request::new(device_id);
2703 match self.send(request).await {
2704 Ok(_) => Ok(true),
2705 Err(err) => {
2706 if let Some(error) = err.as_client_api_error()
2707 && error.status_code == 404
2708 {
2709 Ok(false)
2710 } else {
2711 Err(err.into())
2712 }
2713 }
2714 }
2715 }
2716
2717 /// Synchronize the client's state with the latest state on the server.
2718 ///
2719 /// ## Syncing Events
2720 ///
2721 /// Messages or any other type of event need to be periodically fetched from
2722 /// the server, this is achieved by sending a `/sync` request to the server.
2723 ///
2724 /// The first sync is sent out without a [`token`]. The response of the
2725 /// first sync will contain a [`next_batch`] field which should then be
2726 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2727 /// don't receive the same events multiple times.
2728 ///
2729 /// ## Long Polling
2730 ///
2731 /// A sync should in the usual case always be in flight. The
2732 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2733 /// long the server will wait for new events before it will respond.
2734 /// The server will respond immediately if some new events arrive before the
2735 /// timeout has expired. If no changes arrive and the timeout expires an
2736 /// empty sync response will be sent to the client.
2737 ///
2738 /// This method of sending a request that may not receive a response
2739 /// immediately is called long polling.
2740 ///
2741 /// ## Filtering Events
2742 ///
2743 /// The number or type of messages and events that the client should receive
2744 /// from the server can be altered using a [`Filter`].
2745 ///
2746 /// Filters can be non-trivial and, since they will be sent with every sync
2747 /// request, they may take up a bunch of unnecessary bandwidth.
2748 ///
2749 /// Luckily filters can be uploaded to the server and reused using an unique
2750 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2751 /// method.
2752 ///
2753 /// # Arguments
2754 ///
2755 /// * `sync_settings` - Settings for the sync call, this allows us to set
2756 /// various options to configure the sync:
2757 /// * [`filter`] - To configure which events we receive and which get
2758 /// [filtered] by the server
2759 /// * [`timeout`] - To configure our [long polling] setup.
2760 /// * [`token`] - To tell the server which events we already received
2761 /// and where we wish to continue syncing.
2762 /// * [`full_state`] - To tell the server that we wish to receive all
2763 /// state events, regardless of our configured [`token`].
2764 /// * [`set_presence`] - To tell the server to set the presence and to
2765 /// which state.
2766 ///
2767 /// # Examples
2768 ///
2769 /// ```no_run
2770 /// # use url::Url;
2771 /// # async {
2772 /// # let homeserver = Url::parse("http://localhost:8080")?;
2773 /// # let username = "";
2774 /// # let password = "";
2775 /// use matrix_sdk::{
2776 /// Client, config::SyncSettings,
2777 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2778 /// };
2779 ///
2780 /// let client = Client::new(homeserver).await?;
2781 /// client.matrix_auth().login_username(username, password).send().await?;
2782 ///
2783 /// // Sync once so we receive the client state and old messages.
2784 /// client.sync_once(SyncSettings::default()).await?;
2785 ///
2786 /// // Register our handler so we start responding once we receive a new
2787 /// // event.
2788 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2789 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2790 /// });
2791 ///
2792 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2793 /// // from our `sync_once()` call automatically.
2794 /// client.sync(SyncSettings::default()).await;
2795 /// # anyhow::Ok(()) };
2796 /// ```
2797 ///
2798 /// [`sync`]: #method.sync
2799 /// [`SyncSettings`]: crate::config::SyncSettings
2800 /// [`token`]: crate::config::SyncSettings#method.token
2801 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2802 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2803 /// [`set_presence`]: ruma::presence::PresenceState
2804 /// [`filter`]: crate::config::SyncSettings#method.filter
2805 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2806 /// [`next_batch`]: SyncResponse#structfield.next_batch
2807 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2808 /// [long polling]: #long-polling
2809 /// [filtered]: #filtering-events
2810 #[instrument(skip(self))]
2811 pub async fn sync_once(
2812 &self,
2813 sync_settings: crate::config::SyncSettings,
2814 ) -> Result<SyncResponse> {
2815 // The sync might not return for quite a while due to the timeout.
2816 // We'll see if there's anything crypto related to send out before we
2817 // sync, i.e. if we closed our client after a sync but before the
2818 // crypto requests were sent out.
2819 //
2820 // This will mostly be a no-op.
2821 #[cfg(feature = "e2e-encryption")]
2822 if let Err(e) = self.send_outgoing_requests().await {
2823 error!(error = ?e, "Error while sending outgoing E2EE requests");
2824 }
2825
2826 let token = match sync_settings.token {
2827 SyncToken::Specific(token) => Some(token),
2828 SyncToken::NoToken => None,
2829 SyncToken::ReusePrevious => self.sync_token().await,
2830 };
2831
2832 let request = assign!(sync_events::v3::Request::new(), {
2833 filter: sync_settings.filter.map(|f| *f),
2834 since: token,
2835 full_state: sync_settings.full_state,
2836 set_presence: sync_settings.set_presence,
2837 timeout: sync_settings.timeout,
2838 use_state_after: true,
2839 });
2840 let mut request_config = self.request_config();
2841 if let Some(timeout) = sync_settings.timeout {
2842 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2843 request_config.timeout = Some(base_timeout + timeout);
2844 }
2845
2846 let response = self.send(request).with_request_config(request_config).await?;
2847 let next_batch = response.next_batch.clone();
2848 let response = self.process_sync(response).await?;
2849
2850 #[cfg(feature = "e2e-encryption")]
2851 if let Err(e) = self.send_outgoing_requests().await {
2852 error!(error = ?e, "Error while sending outgoing E2EE requests");
2853 }
2854
2855 self.inner.sync_beat.notify(usize::MAX);
2856
2857 Ok(SyncResponse::new(next_batch, response))
2858 }
2859
2860 /// Repeatedly synchronize the client state with the server.
2861 ///
2862 /// This method will only return on error, if cancellation is needed
2863 /// the method should be wrapped in a cancelable task or the
2864 /// [`Client::sync_with_callback`] method can be used or
2865 /// [`Client::sync_with_result_callback`] if you want to handle error
2866 /// cases in the loop, too.
2867 ///
2868 /// This method will internally call [`Client::sync_once`] in a loop.
2869 ///
2870 /// This method can be used with the [`Client::add_event_handler`]
2871 /// method to react to individual events. If you instead wish to handle
2872 /// events in a bulk manner the [`Client::sync_with_callback`],
2873 /// [`Client::sync_with_result_callback`] and
2874 /// [`Client::sync_stream`] methods can be used instead. Those methods
2875 /// repeatedly return the whole sync response.
2876 ///
2877 /// # Arguments
2878 ///
2879 /// * `sync_settings` - Settings for the sync call. *Note* that those
2880 /// settings will be only used for the first sync call. See the argument
2881 /// docs for [`Client::sync_once`] for more info.
2882 ///
2883 /// # Return
2884 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2885 /// up to the user of the API to check the error and decide whether the sync
2886 /// should continue or not.
2887 ///
2888 /// # Examples
2889 ///
2890 /// ```no_run
2891 /// # use url::Url;
2892 /// # async {
2893 /// # let homeserver = Url::parse("http://localhost:8080")?;
2894 /// # let username = "";
2895 /// # let password = "";
2896 /// use matrix_sdk::{
2897 /// Client, config::SyncSettings,
2898 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2899 /// };
2900 ///
2901 /// let client = Client::new(homeserver).await?;
2902 /// client.matrix_auth().login_username(&username, &password).send().await?;
2903 ///
2904 /// // Register our handler so we start responding once we receive a new
2905 /// // event.
2906 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2907 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2908 /// });
2909 ///
2910 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2911 /// // automatically.
2912 /// client.sync(SyncSettings::default()).await?;
2913 /// # anyhow::Ok(()) };
2914 /// ```
2915 ///
2916 /// [argument docs]: #method.sync_once
2917 /// [`sync_with_callback`]: #method.sync_with_callback
2918 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2919 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2920 }
2921
2922 /// Repeatedly call sync to synchronize the client state with the server.
2923 ///
2924 /// # Arguments
2925 ///
2926 /// * `sync_settings` - Settings for the sync call. *Note* that those
2927 /// settings will be only used for the first sync call. See the argument
2928 /// docs for [`Client::sync_once`] for more info.
2929 ///
2930 /// * `callback` - A callback that will be called every time a successful
2931 /// response has been fetched from the server. The callback must return a
2932 /// boolean which signalizes if the method should stop syncing. If the
2933 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2934 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2935 ///
2936 /// # Return
2937 /// The sync runs until an error occurs or the
2938 /// callback indicates that the Loop should stop. If the callback asked for
2939 /// a regular stop, the result will be `Ok(())` otherwise the
2940 /// `Err(Error)` is returned.
2941 ///
2942 /// # Examples
2943 ///
2944 /// The following example demonstrates how to sync forever while sending all
2945 /// the interesting events through a mpsc channel to another thread e.g. a
2946 /// UI thread.
2947 ///
2948 /// ```no_run
2949 /// # use std::time::Duration;
2950 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2951 /// # use url::Url;
2952 /// # async {
2953 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2954 /// # let mut client = Client::new(homeserver).await.unwrap();
2955 ///
2956 /// use tokio::sync::mpsc::channel;
2957 ///
2958 /// let (tx, rx) = channel(100);
2959 ///
2960 /// let sync_channel = &tx;
2961 /// let sync_settings = SyncSettings::new()
2962 /// .timeout(Duration::from_secs(30));
2963 ///
2964 /// client
2965 /// .sync_with_callback(sync_settings, |response| async move {
2966 /// let channel = sync_channel;
2967 /// for (room_id, room) in response.rooms.joined {
2968 /// for event in room.timeline.events {
2969 /// channel.send(event).await.unwrap();
2970 /// }
2971 /// }
2972 ///
2973 /// LoopCtrl::Continue
2974 /// })
2975 /// .await;
2976 /// };
2977 /// ```
2978 #[instrument(skip_all)]
2979 pub async fn sync_with_callback<C>(
2980 &self,
2981 sync_settings: crate::config::SyncSettings,
2982 callback: impl Fn(SyncResponse) -> C,
2983 ) -> Result<(), Error>
2984 where
2985 C: Future<Output = LoopCtrl>,
2986 {
2987 self.sync_with_result_callback(sync_settings, |result| async {
2988 Ok(callback(result?).await)
2989 })
2990 .await
2991 }
2992
2993 /// Repeatedly call sync to synchronize the client state with the server.
2994 ///
2995 /// # Arguments
2996 ///
2997 /// * `sync_settings` - Settings for the sync call. *Note* that those
2998 /// settings will be only used for the first sync call. See the argument
2999 /// docs for [`Client::sync_once`] for more info.
3000 ///
3001 /// * `callback` - A callback that will be called every time after a
3002 /// response has been received, failure or not. The callback returns a
3003 /// `Result<LoopCtrl, Error>`, too. When returning
3004 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
3005 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
3006 /// function returns `Ok(())`. In case the callback can't handle the
3007 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
3008 /// which results in the sync ending and the `Err(Error)` being returned.
3009 ///
3010 /// # Return
3011 /// The sync runs until an error occurs that the callback can't handle or
3012 /// the callback indicates that the Loop should stop. If the callback
3013 /// asked for a regular stop, the result will be `Ok(())` otherwise the
3014 /// `Err(Error)` is returned.
3015 ///
3016 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
3017 /// this, and are handled first without sending the result to the
3018 /// callback. Only after they have exceeded is the `Result` handed to
3019 /// the callback.
3020 ///
3021 /// # Examples
3022 ///
3023 /// The following example demonstrates how to sync forever while sending all
3024 /// the interesting events through a mpsc channel to another thread e.g. a
3025 /// UI thread.
3026 ///
3027 /// ```no_run
3028 /// # use std::time::Duration;
3029 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
3030 /// # use url::Url;
3031 /// # async {
3032 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
3033 /// # let mut client = Client::new(homeserver).await.unwrap();
3034 /// #
3035 /// use tokio::sync::mpsc::channel;
3036 ///
3037 /// let (tx, rx) = channel(100);
3038 ///
3039 /// let sync_channel = &tx;
3040 /// let sync_settings = SyncSettings::new()
3041 /// .timeout(Duration::from_secs(30));
3042 ///
3043 /// client
3044 /// .sync_with_result_callback(sync_settings, |response| async move {
3045 /// let channel = sync_channel;
3046 /// let sync_response = response?;
3047 /// for (room_id, room) in sync_response.rooms.joined {
3048 /// for event in room.timeline.events {
3049 /// channel.send(event).await.unwrap();
3050 /// }
3051 /// }
3052 ///
3053 /// Ok(LoopCtrl::Continue)
3054 /// })
3055 /// .await;
3056 /// };
3057 /// ```
3058 #[instrument(skip(self, callback))]
3059 pub async fn sync_with_result_callback<C>(
3060 &self,
3061 sync_settings: crate::config::SyncSettings,
3062 callback: impl Fn(Result<SyncResponse, Error>) -> C,
3063 ) -> Result<(), Error>
3064 where
3065 C: Future<Output = Result<LoopCtrl, Error>>,
3066 {
3067 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
3068
3069 while let Some(result) = sync_stream.next().await {
3070 trace!("Running callback");
3071 if callback(result).await? == LoopCtrl::Break {
3072 trace!("Callback told us to stop");
3073 break;
3074 }
3075 trace!("Done running callback");
3076 }
3077
3078 Ok(())
3079 }
3080
3081 //// Repeatedly synchronize the client state with the server.
3082 ///
3083 /// This method will internally call [`Client::sync_once`] in a loop and is
3084 /// equivalent to the [`Client::sync`] method but the responses are provided
3085 /// as an async stream.
3086 ///
3087 /// # Arguments
3088 ///
3089 /// * `sync_settings` - Settings for the sync call. *Note* that those
3090 /// settings will be only used for the first sync call. See the argument
3091 /// docs for [`Client::sync_once`] for more info.
3092 ///
3093 /// # Examples
3094 ///
3095 /// ```no_run
3096 /// # use url::Url;
3097 /// # async {
3098 /// # let homeserver = Url::parse("http://localhost:8080")?;
3099 /// # let username = "";
3100 /// # let password = "";
3101 /// use futures_util::StreamExt;
3102 /// use matrix_sdk::{Client, config::SyncSettings};
3103 ///
3104 /// let client = Client::new(homeserver).await?;
3105 /// client.matrix_auth().login_username(&username, &password).send().await?;
3106 ///
3107 /// let mut sync_stream =
3108 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
3109 ///
3110 /// while let Some(Ok(response)) = sync_stream.next().await {
3111 /// for room in response.rooms.joined.values() {
3112 /// for e in &room.timeline.events {
3113 /// if let Ok(event) = e.raw().deserialize() {
3114 /// println!("Received event {:?}", event);
3115 /// }
3116 /// }
3117 /// }
3118 /// }
3119 ///
3120 /// # anyhow::Ok(()) };
3121 /// ```
3122 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3123 #[instrument(skip(self))]
3124 pub async fn sync_stream(
3125 &self,
3126 mut sync_settings: crate::config::SyncSettings,
3127 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3128 let mut is_first_sync = true;
3129 let mut timeout = None;
3130 let mut last_sync_time: Option<Instant> = None;
3131
3132 let parent_span = Span::current();
3133
3134 async_stream::stream!({
3135 loop {
3136 trace!("Syncing");
3137
3138 if sync_settings.ignore_timeout_on_first_sync {
3139 if is_first_sync {
3140 timeout = sync_settings.timeout.take();
3141 } else if sync_settings.timeout.is_none() && timeout.is_some() {
3142 sync_settings.timeout = timeout.take();
3143 }
3144
3145 is_first_sync = false;
3146 }
3147
3148 yield self
3149 .sync_loop_helper(&mut sync_settings)
3150 .instrument(parent_span.clone())
3151 .await;
3152
3153 Client::delay_sync(&mut last_sync_time).await
3154 }
3155 })
3156 }
3157
3158 /// Get the current, if any, sync token of the client.
3159 /// This will be None if the client didn't sync at least once.
3160 pub(crate) async fn sync_token(&self) -> Option<String> {
3161 self.inner.base_client.sync_token().await
3162 }
3163
3164 /// Gets information about the owner of a given access token.
3165 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3166 let request = whoami::v3::Request::new();
3167 self.send(request).await
3168 }
3169
3170 /// Subscribes a new receiver to client SessionChange broadcasts.
3171 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3172 let broadcast = &self.auth_ctx().session_change_sender;
3173 broadcast.subscribe()
3174 }
3175
3176 /// Sets the save/restore session callbacks.
3177 ///
3178 /// This is another mechanism to get synchronous updates to session tokens,
3179 /// while [`Self::subscribe_to_session_changes`] provides an async update.
3180 pub fn set_session_callbacks(
3181 &self,
3182 reload_session_callback: Box<ReloadSessionCallback>,
3183 save_session_callback: Box<SaveSessionCallback>,
3184 ) -> Result<()> {
3185 self.inner
3186 .auth_ctx
3187 .reload_session_callback
3188 .set(reload_session_callback)
3189 .map_err(|_| Error::MultipleSessionCallbacks)?;
3190
3191 self.inner
3192 .auth_ctx
3193 .save_session_callback
3194 .set(save_session_callback)
3195 .map_err(|_| Error::MultipleSessionCallbacks)?;
3196
3197 Ok(())
3198 }
3199
3200 /// Get the notification settings of the current owner of the client.
3201 pub async fn notification_settings(&self) -> NotificationSettings {
3202 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3203 NotificationSettings::new(self.clone(), ruleset)
3204 }
3205
3206 /// Create a new specialized `Client` that can process notifications.
3207 ///
3208 /// See [`CrossProcessLock::new`] to learn more about
3209 /// `cross_process_lock_config`.
3210 ///
3211 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3212 pub async fn notification_client(
3213 &self,
3214 cross_process_lock_config: CrossProcessLockConfig,
3215 ) -> Result<Client> {
3216 let client = Client {
3217 inner: ClientInner::new(
3218 self.inner.auth_ctx.clone(),
3219 self.server().cloned(),
3220 self.homeserver(),
3221 self.sliding_sync_version(),
3222 self.inner.http_client.clone(),
3223 self.inner
3224 .base_client
3225 .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3226 .await?,
3227 self.inner.caches.supported_versions.value(),
3228 self.inner.caches.well_known.value(),
3229 self.inner.respect_login_well_known,
3230 self.inner.event_cache.clone(),
3231 self.inner.send_queue_data.clone(),
3232 self.inner.latest_events.clone(),
3233 #[cfg(feature = "e2e-encryption")]
3234 self.inner.e2ee.encryption_settings,
3235 #[cfg(feature = "e2e-encryption")]
3236 self.inner.enable_share_history_on_invite,
3237 cross_process_lock_config,
3238 #[cfg(feature = "experimental-search")]
3239 self.inner.search_index.clone(),
3240 self.inner.thread_subscription_catchup.clone(),
3241 )
3242 .await,
3243 };
3244
3245 Ok(client)
3246 }
3247
3248 /// The [`EventCache`] instance for this [`Client`].
3249 pub fn event_cache(&self) -> &EventCache {
3250 // SAFETY: always initialized in the `Client` ctor.
3251 self.inner.event_cache.get().unwrap()
3252 }
3253
3254 /// The [`LatestEvents`] instance for this [`Client`].
3255 pub async fn latest_events(&self) -> &LatestEvents {
3256 self.inner
3257 .latest_events
3258 .get_or_init(|| async {
3259 LatestEvents::new(
3260 WeakClient::from_client(self),
3261 self.event_cache().clone(),
3262 SendQueue::new(self.clone()),
3263 self.room_info_notable_update_receiver(),
3264 )
3265 })
3266 .await
3267 }
3268
3269 /// Waits until an at least partially synced room is received, and returns
3270 /// it.
3271 ///
3272 /// **Note: this function will loop endlessly until either it finds the room
3273 /// or an externally set timeout happens.**
3274 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3275 loop {
3276 if let Some(room) = self.get_room(room_id) {
3277 if room.is_state_partially_or_fully_synced() {
3278 debug!("Found just created room!");
3279 return room;
3280 }
3281 debug!("Room wasn't partially synced, waiting for sync beat to try again");
3282 } else {
3283 debug!("Room wasn't found, waiting for sync beat to try again");
3284 }
3285 self.inner.sync_beat.listen().await;
3286 }
3287 }
3288
3289 /// Knock on a room given its `room_id_or_alias` to ask for permission to
3290 /// join it.
3291 pub async fn knock(
3292 &self,
3293 room_id_or_alias: OwnedRoomOrAliasId,
3294 reason: Option<String>,
3295 server_names: Vec<OwnedServerName>,
3296 ) -> Result<Room> {
3297 let request =
3298 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3299 let response = self.send(request).await?;
3300 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3301 Ok(Room::new(self.clone(), base_room))
3302 }
3303
3304 /// Checks whether the provided `user_id` belongs to an ignored user.
3305 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3306 self.base_client().is_user_ignored(user_id).await
3307 }
3308
3309 /// Gets the `max_upload_size` value from the homeserver, getting either a
3310 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3311 /// missing.
3312 ///
3313 /// Check the spec for more info:
3314 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3315 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3316 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3317 if let Some(data) = max_upload_size_lock.get() {
3318 return Ok(data.to_owned());
3319 }
3320
3321 // Use the authenticated endpoint when the server supports it.
3322 let supported_versions = self.supported_versions().await?;
3323 let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3324 .is_supported(&supported_versions);
3325
3326 let upload_size = if use_auth {
3327 self.send(authenticated_media::get_media_config::v1::Request::default())
3328 .await?
3329 .upload_size
3330 } else {
3331 #[allow(deprecated)]
3332 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3333 };
3334
3335 match max_upload_size_lock.set(upload_size) {
3336 Ok(_) => Ok(upload_size),
3337 Err(error) => {
3338 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3339 }
3340 }
3341 }
3342
3343 /// The settings to use for decrypting events.
3344 #[cfg(feature = "e2e-encryption")]
3345 pub fn decryption_settings(&self) -> &DecryptionSettings {
3346 &self.base_client().decryption_settings
3347 }
3348
3349 /// Returns the [`SearchIndex`] for this [`Client`].
3350 #[cfg(feature = "experimental-search")]
3351 pub fn search_index(&self) -> &SearchIndex {
3352 &self.inner.search_index
3353 }
3354
3355 /// Whether the client is configured to take thread subscriptions (MSC4306
3356 /// and MSC4308) into account, and the server enabled the experimental
3357 /// feature flag for it.
3358 ///
3359 /// This may cause filtering out of thread subscriptions, and loading the
3360 /// thread subscriptions via the sliding sync extension, when the room
3361 /// list service is being used.
3362 ///
3363 /// This is async and fallible as it may use the network to retrieve the
3364 /// server supported features, if they aren't cached already.
3365 pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3366 // Check if the client is configured to support thread subscriptions first.
3367 match self.base_client().threading_support {
3368 ThreadingSupport::Enabled { with_subscriptions: false }
3369 | ThreadingSupport::Disabled => return Ok(false),
3370 ThreadingSupport::Enabled { with_subscriptions: true } => {}
3371 }
3372
3373 // Now, let's check that the server supports it.
3374 let server_enabled = self
3375 .supported_versions()
3376 .await?
3377 .features
3378 .contains(&FeatureFlag::from("org.matrix.msc4306"));
3379
3380 Ok(server_enabled)
3381 }
3382
3383 /// Fetch thread subscriptions changes between `from` and up to `to`.
3384 ///
3385 /// The `limit` optional parameter can be used to limit the number of
3386 /// entries in a response. It can also be overridden by the server, if
3387 /// it's deemed too large.
3388 pub async fn fetch_thread_subscriptions(
3389 &self,
3390 from: Option<String>,
3391 to: Option<String>,
3392 limit: Option<UInt>,
3393 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3394 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3395 from,
3396 to,
3397 limit,
3398 });
3399 Ok(self.send(request).await?)
3400 }
3401
3402 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3403 self.inner.thread_subscription_catchup.get().unwrap()
3404 }
3405
3406 /// Pause the client for background suspension.
3407 ///
3408 /// This method:
3409 /// 1. Disables all send queues (prevents new message sends).
3410 /// 2. Pauses all database stores, waiting for in-flight operations and
3411 /// releasing all connections and file locks.
3412 ///
3413 /// Call [`Client::resume()`] when the app returns to the foreground.
3414 ///
3415 /// # iOS
3416 ///
3417 /// Call this before the app is suspended to avoid `0xdead10cc` kills.
3418 /// Typically called from
3419 /// [`applicationDidEnterBackground`](https://developer.apple.com/documentation/uikit/uiapplicationdelegate/applicationdidenterbackground(_:))
3420 /// or an equivalent SwiftUI lifecycle event, *after* stopping the
3421 /// `matrix_sdk_ui::sync_service::SyncService`.
3422 pub async fn pause(&self) -> Result<()> {
3423 info!("Client::pause — releasing database resources");
3424
3425 // Disable send queues so no new sends hit the stores.
3426 self.send_queue().set_enabled(false).await;
3427
3428 // Close all stores (waits for in-flight ops, closes connections).
3429 self.base_client().close_stores().await?;
3430
3431 info!("Client::pause — complete, all database connections released");
3432 Ok(())
3433 }
3434
3435 /// Resume the client after a [`Client::pause()`].
3436 ///
3437 /// Re-acquires store resources and re-enables send queues.
3438 ///
3439 /// If your app stopped the `matrix_sdk_ui::sync_service::SyncService`
3440 /// before pausing, restart it separately as appropriate for your app
3441 /// lifecycle.
3442 pub async fn resume(&self) -> Result<()> {
3443 info!("Client::resume — re-acquiring database resources");
3444
3445 // Reopen stores (creates new connection pools).
3446 self.base_client().reopen_stores().await?;
3447
3448 // Re-enable send queues.
3449 self.send_queue().set_enabled(true).await;
3450
3451 info!("Client::resume — complete");
3452 Ok(())
3453 }
3454
3455 /// Perform database optimizations if any are available, i.e. vacuuming in
3456 /// SQLite.
3457 ///
3458 /// **Warning:** this was added to check if SQLite fragmentation was the
3459 /// source of performance issues, **DO NOT use in production**.
3460 #[doc(hidden)]
3461 pub async fn optimize_stores(&self) -> Result<()> {
3462 trace!("Optimizing state store...");
3463 self.state_store().optimize().await?;
3464
3465 trace!("Optimizing event cache store...");
3466 if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3467 clean_lock.optimize().await?;
3468 }
3469
3470 trace!("Optimizing media store...");
3471 self.media_store().lock().await?.optimize().await?;
3472
3473 Ok(())
3474 }
3475
3476 /// Returns the sizes of the existing stores, if known.
3477 pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3478 #[cfg(feature = "e2e-encryption")]
3479 let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3480 && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3481 {
3482 Some(store_size)
3483 } else {
3484 None
3485 };
3486 #[cfg(not(feature = "e2e-encryption"))]
3487 let crypto_store_size = None;
3488
3489 let state_store_size = self.state_store().get_size().await.ok().flatten();
3490
3491 let event_cache_store_size = if let Some(clean_lock) =
3492 self.event_cache_store().lock().await?.as_clean()
3493 && let Ok(Some(store_size)) = clean_lock.get_size().await
3494 {
3495 Some(store_size)
3496 } else {
3497 None
3498 };
3499
3500 let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3501
3502 Ok(StoreSizes {
3503 crypto_store: crypto_store_size,
3504 state_store: state_store_size,
3505 event_cache_store: event_cache_store_size,
3506 media_store: media_store_size,
3507 })
3508 }
3509
3510 /// Get a reference to the client's task monitor, for spawning background
3511 /// tasks.
3512 pub fn task_monitor(&self) -> &TaskMonitor {
3513 &self.inner.task_monitor
3514 }
3515
3516 /// Add a subscriber for duplicate key upload error notifications triggered
3517 /// by requests to /keys/upload.
3518 #[cfg(feature = "e2e-encryption")]
3519 pub fn subscribe_to_duplicate_key_upload_errors(
3520 &self,
3521 ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3522 self.inner.duplicate_key_upload_error_sender.subscribe()
3523 }
3524
3525 /// Check the record of whether we are waiting for an [MSC4268] key bundle
3526 /// for the given room.
3527 ///
3528 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3529 #[cfg(feature = "e2e-encryption")]
3530 pub async fn get_pending_key_bundle_details_for_room(
3531 &self,
3532 room_id: &RoomId,
3533 ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3534 Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3535 }
3536
3537 /// Returns the [`DmRoomDefinition`] this client uses to check if a room is
3538 /// a DM.
3539 pub fn dm_room_definition(&self) -> &DmRoomDefinition {
3540 &self.inner.base_client.dm_room_definition
3541 }
3542}
3543
3544/// Contains the disk size of the different stores, if known. It won't be
3545/// available for in-memory stores.
3546#[derive(Debug, Clone)]
3547pub struct StoreSizes {
3548 /// The size of the CryptoStore.
3549 pub crypto_store: Option<usize>,
3550 /// The size of the StateStore.
3551 pub state_store: Option<usize>,
3552 /// The size of the EventCacheStore.
3553 pub event_cache_store: Option<usize>,
3554 /// The size of the MediaStore.
3555 pub media_store: Option<usize>,
3556}
3557
3558#[cfg(any(feature = "testing", test))]
3559impl Client {
3560 /// Test helper to mark users as tracked by the crypto layer.
3561 #[cfg(feature = "e2e-encryption")]
3562 pub async fn update_tracked_users_for_testing(
3563 &self,
3564 user_ids: impl IntoIterator<Item = &UserId>,
3565 ) {
3566 let olm = self.olm_machine().await;
3567 let olm = olm.as_ref().unwrap();
3568 olm.update_tracked_users(user_ids).await.unwrap();
3569 }
3570}
3571
3572/// A weak reference to the inner client, useful when trying to get a handle
3573/// on the owning client.
3574#[derive(Clone, Debug)]
3575pub(crate) struct WeakClient {
3576 client: Weak<ClientInner>,
3577}
3578
3579impl WeakClient {
3580 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3581 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3582 Self { client: Arc::downgrade(client) }
3583 }
3584
3585 /// Construct a [`WeakClient`] from a [`Client`].
3586 pub fn from_client(client: &Client) -> Self {
3587 Self::from_inner(&client.inner)
3588 }
3589
3590 /// Attempts to get a [`Client`] from this [`WeakClient`].
3591 pub fn get(&self) -> Option<Client> {
3592 self.client.upgrade().map(|inner| Client { inner })
3593 }
3594
3595 /// Gets the number of strong (`Arc`) pointers still pointing to this
3596 /// client.
3597 #[allow(dead_code)]
3598 pub fn strong_count(&self) -> usize {
3599 self.client.strong_count()
3600 }
3601}
3602
3603/// Information about the state of a room before we joined it.
3604#[derive(Debug, Clone, Default)]
3605struct PreJoinRoomInfo {
3606 /// The user who invited us to the room, if any.
3607 pub inviter: Option<RoomMember>,
3608}
3609
3610// The http mocking library is not supported for wasm32
3611#[cfg(all(test, not(target_family = "wasm")))]
3612pub(crate) mod tests {
3613 use std::{sync::Arc, time::Duration};
3614
3615 use assert_matches::assert_matches;
3616 use assert_matches2::assert_let;
3617 use eyeball::SharedObservable;
3618 use futures_util::{FutureExt, StreamExt, pin_mut};
3619 use js_int::{UInt, uint};
3620 use matrix_sdk_base::{
3621 RoomState,
3622 store::{MemoryStore, StoreConfig},
3623 ttl::TtlValue,
3624 };
3625 use matrix_sdk_test::{
3626 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3627 event_factory::EventFactory,
3628 };
3629 #[cfg(target_family = "wasm")]
3630 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3631
3632 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3633 use ruma::{
3634 RoomId, ServerName, UserId,
3635 api::{
3636 FeatureFlag, MatrixVersion,
3637 client::{
3638 discovery::discover_homeserver::RtcFocusInfo,
3639 room::create_room::v3::Request as CreateRoomRequest,
3640 },
3641 },
3642 assign,
3643 events::{
3644 ignored_user_list::IgnoredUserListEventContent,
3645 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3646 },
3647 owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3648 };
3649 use serde_json::json;
3650 use stream_assert::{assert_next_matches, assert_pending};
3651 use tokio::{
3652 spawn,
3653 time::{sleep, timeout},
3654 };
3655 use url::Url;
3656
3657 use super::Client;
3658 use crate::{
3659 Error, TransmissionProgress,
3660 client::{WeakClient, caches::CachedValue, futures::SendMediaUploadRequest},
3661 config::{RequestConfig, SyncSettings},
3662 futures::SendRequest,
3663 media::MediaError,
3664 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3665 };
3666
3667 #[async_test]
3668 async fn test_account_data() {
3669 let server = MatrixMockServer::new().await;
3670 let client = server.client_builder().build().await;
3671
3672 let f = EventFactory::new();
3673 server
3674 .mock_sync()
3675 .ok_and_run(&client, |builder| {
3676 builder.add_global_account_data(
3677 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3678 );
3679 })
3680 .await;
3681
3682 let content = client
3683 .account()
3684 .account_data::<IgnoredUserListEventContent>()
3685 .await
3686 .unwrap()
3687 .unwrap()
3688 .deserialize()
3689 .unwrap();
3690
3691 assert_eq!(content.ignored_users.len(), 1);
3692 }
3693
3694 #[async_test]
3695 async fn test_successful_discovery() {
3696 // Imagine this is `matrix.org`.
3697 let server = MatrixMockServer::new().await;
3698 let server_url = server.uri();
3699
3700 // Imagine this is `matrix-client.matrix.org`.
3701 let homeserver = MatrixMockServer::new().await;
3702 let homeserver_url = homeserver.uri();
3703
3704 // Imagine Alice has the user ID `@alice:matrix.org`.
3705 let domain = server_url.strip_prefix("http://").unwrap();
3706 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3707
3708 // The `.well-known` is on the server (e.g. `matrix.org`).
3709 server
3710 .mock_well_known()
3711 .ok_with_homeserver_url(&homeserver_url)
3712 .mock_once()
3713 .named("well-known")
3714 .mount()
3715 .await;
3716
3717 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3718 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3719
3720 let client = Client::builder()
3721 .insecure_server_name_no_tls(alice.server_name())
3722 .build()
3723 .await
3724 .unwrap();
3725
3726 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3727 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3728 client.server_versions().await.unwrap();
3729 }
3730
3731 #[async_test]
3732 async fn test_discovery_broken_server() {
3733 let server = MatrixMockServer::new().await;
3734 let server_url = server.uri();
3735 let domain = server_url.strip_prefix("http://").unwrap();
3736 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3737
3738 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3739
3740 assert!(
3741 Client::builder()
3742 .insecure_server_name_no_tls(alice.server_name())
3743 .build()
3744 .await
3745 .is_err(),
3746 "Creating a client from a user ID should fail when the .well-known request fails."
3747 );
3748 }
3749
3750 #[async_test]
3751 async fn test_room_creation() {
3752 let server = MatrixMockServer::new().await;
3753 let client = server.client_builder().build().await;
3754
3755 let f = EventFactory::new().sender(user_id!("@example:localhost"));
3756 server
3757 .mock_sync()
3758 .ok_and_run(&client, |builder| {
3759 builder.add_joined_room(
3760 JoinedRoomBuilder::default()
3761 .add_state_event(
3762 f.member(user_id!("@example:localhost")).display_name("example"),
3763 )
3764 .add_state_event(f.default_power_levels()),
3765 );
3766 })
3767 .await;
3768
3769 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3770 assert_eq!(room.state(), RoomState::Joined);
3771 }
3772
3773 #[async_test]
3774 async fn test_retry_limit_http_requests() {
3775 let server = MatrixMockServer::new().await;
3776 let client = server
3777 .client_builder()
3778 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3779 .build()
3780 .await;
3781
3782 assert!(client.request_config().retry_limit.unwrap() == 4);
3783
3784 server.mock_who_am_i().error500().expect(4).mount().await;
3785
3786 client.whoami().await.unwrap_err();
3787 }
3788
3789 #[async_test]
3790 async fn test_retry_timeout_http_requests() {
3791 // Keep this timeout small so that the test doesn't take long
3792 let retry_timeout = Duration::from_secs(5);
3793 let server = MatrixMockServer::new().await;
3794 let client = server
3795 .client_builder()
3796 .on_builder(|builder| {
3797 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3798 })
3799 .build()
3800 .await;
3801
3802 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3803
3804 server.mock_login().error500().expect(2..).mount().await;
3805
3806 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3807 }
3808
3809 #[async_test]
3810 async fn test_short_retry_initial_http_requests() {
3811 let server = MatrixMockServer::new().await;
3812 let client = server
3813 .client_builder()
3814 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3815 .build()
3816 .await;
3817
3818 server.mock_login().error500().expect(3..).mount().await;
3819
3820 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3821 }
3822
3823 #[async_test]
3824 async fn test_no_retry_http_requests() {
3825 let server = MatrixMockServer::new().await;
3826 let client = server.client_builder().build().await;
3827
3828 server.mock_devices().error500().mock_once().mount().await;
3829
3830 client.devices().await.unwrap_err();
3831 }
3832
3833 #[async_test]
3834 async fn test_set_homeserver() {
3835 let client = MockClientBuilder::new(None).build().await;
3836 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3837
3838 let homeserver = Url::parse("http://example.com/").unwrap();
3839 client.set_homeserver(homeserver.clone());
3840 assert_eq!(client.homeserver(), homeserver);
3841 }
3842
3843 #[async_test]
3844 async fn test_search_user_request() {
3845 let server = MatrixMockServer::new().await;
3846 let client = server.client_builder().build().await;
3847
3848 server.mock_user_directory().ok().mock_once().mount().await;
3849
3850 let response = client.search_users("test", 50).await.unwrap();
3851 assert_eq!(response.results.len(), 1);
3852 let result = &response.results[0];
3853 assert_eq!(result.user_id.to_string(), "@test:example.me");
3854 assert_eq!(result.display_name.clone().unwrap(), "Test");
3855 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3856 assert!(!response.limited);
3857 }
3858
3859 #[async_test]
3860 async fn test_request_unstable_features() {
3861 let server = MatrixMockServer::new().await;
3862 let client = server.client_builder().no_server_versions().build().await;
3863
3864 server
3865 .mock_versions()
3866 .with_feature("org.matrix.e2e_cross_signing", true)
3867 .ok()
3868 .mock_once()
3869 .mount()
3870 .await;
3871
3872 let unstable_features = client.unstable_features().await.unwrap();
3873 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3874 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3875 }
3876
3877 #[async_test]
3878 async fn test_can_homeserver_push_encrypted_event_to_device() {
3879 let server = MatrixMockServer::new().await;
3880 let client = server.client_builder().no_server_versions().build().await;
3881
3882 server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3883
3884 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3885 assert!(msc4028_enabled);
3886 }
3887
3888 #[async_test]
3889 async fn test_recently_visited_rooms() {
3890 // Tracking recently visited rooms requires authentication
3891 let client = MockClientBuilder::new(None).unlogged().build().await;
3892 assert_matches!(
3893 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3894 Err(Error::AuthenticationRequired)
3895 );
3896
3897 let client = MockClientBuilder::new(None).build().await;
3898 let account = client.account();
3899
3900 // We should start off with an empty list
3901 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3902
3903 // Tracking a valid room id should add it to the list
3904 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3905 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3906 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3907
3908 // And the existing list shouldn't be changed
3909 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3910 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3911
3912 // Tracking the same room again shouldn't change the list
3913 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3914 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3915 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3916
3917 // Tracking a second room should add it to the front of the list
3918 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3919 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3920 assert_eq!(
3921 account.get_recently_visited_rooms().await.unwrap(),
3922 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3923 );
3924
3925 // Tracking the first room yet again should move it to the front of the list
3926 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3927 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3928 assert_eq!(
3929 account.get_recently_visited_rooms().await.unwrap(),
3930 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3931 );
3932
3933 // Tracking should be capped at 20
3934 for n in 0..20 {
3935 account
3936 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3937 .await
3938 .unwrap();
3939 }
3940
3941 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3942
3943 // And the initial rooms should've been pushed out
3944 let rooms = account.get_recently_visited_rooms().await.unwrap();
3945 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3946 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3947
3948 // And the last tracked room should be the first
3949 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3950 }
3951
3952 #[async_test]
3953 async fn test_client_no_cycle_with_event_cache() {
3954 let client = MockClientBuilder::new(None).build().await;
3955
3956 // Wait for the init tasks to die.
3957 sleep(Duration::from_secs(1)).await;
3958
3959 let weak_client = WeakClient::from_client(&client);
3960 assert_eq!(weak_client.strong_count(), 1);
3961
3962 {
3963 let room_id = room_id!("!room:example.org");
3964
3965 // Have the client know the room.
3966 let response = SyncResponseBuilder::default()
3967 .add_joined_room(JoinedRoomBuilder::new(room_id))
3968 .build_sync_response();
3969 client.inner.base_client.receive_sync_response(response).await.unwrap();
3970
3971 client.event_cache().subscribe().unwrap();
3972
3973 let (_room_event_cache, _drop_handles) =
3974 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3975 }
3976
3977 drop(client);
3978
3979 // Give a bit of time for background tasks to die.
3980 sleep(Duration::from_secs(1)).await;
3981
3982 // The weak client must be the last reference to the client now.
3983 assert_eq!(weak_client.strong_count(), 0);
3984 let client = weak_client.get();
3985 assert!(
3986 client.is_none(),
3987 "too many strong references to the client: {}",
3988 Arc::strong_count(&client.unwrap().inner)
3989 );
3990 }
3991
3992 #[async_test]
3993 async fn test_supported_versions_caching() {
3994 let server = MatrixMockServer::new().await;
3995
3996 let versions_mock = server
3997 .mock_versions()
3998 .expect_default_access_token()
3999 .with_feature("org.matrix.e2e_cross_signing", true)
4000 .ok()
4001 .named("first versions mock")
4002 .expect(1)
4003 .mount_as_scoped()
4004 .await;
4005
4006 let memory_store = Arc::new(MemoryStore::new());
4007 let client = server
4008 .client_builder()
4009 .no_server_versions()
4010 .on_builder(|builder| {
4011 builder.store_config(
4012 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4013 .state_store(memory_store.clone()),
4014 )
4015 })
4016 .build()
4017 .await;
4018
4019 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4020
4021 // The result was cached.
4022 assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
4023 // This subsequent call hits the in-memory cache.
4024 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4025
4026 drop(client);
4027
4028 let client = server
4029 .client_builder()
4030 .no_server_versions()
4031 .on_builder(|builder| {
4032 builder.store_config(
4033 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4034 .state_store(memory_store.clone()),
4035 )
4036 })
4037 .build()
4038 .await;
4039
4040 // These calls to the new client hit the on-disk cache.
4041 assert!(
4042 client
4043 .unstable_features()
4044 .await
4045 .unwrap()
4046 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
4047 );
4048
4049 let supported = client.supported_versions().await.unwrap();
4050 assert!(supported.versions.contains(&MatrixVersion::V1_0));
4051 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4052
4053 // Then this call hits the in-memory cache.
4054 let supported = client.supported_versions().await.unwrap();
4055 assert!(supported.versions.contains(&MatrixVersion::V1_0));
4056 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4057
4058 drop(versions_mock);
4059
4060 // Now, reset the cache, and observe the endpoint being called again once.
4061 client.reset_supported_versions().await.unwrap();
4062
4063 server.mock_versions().ok().expect(2).named("second versions mock").mount().await;
4064
4065 // Hits network again.
4066 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4067 // Hits in-memory cache again.
4068 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4069 assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4070
4071 // Force an expiry of the data.
4072 let supported_versions = client.supported_versions_cached().await.unwrap().unwrap();
4073 let mut ttl_value = TtlValue::new(supported_versions);
4074 ttl_value.expire();
4075 client.inner.caches.supported_versions.set_value(ttl_value);
4076
4077 // Call the method to trigger a cache refresh background task.
4078 client.supported_versions_cached().await.unwrap().unwrap();
4079
4080 // We wait for the task to finish, the endpoint should have been called again.
4081 sleep(Duration::from_secs(1)).await;
4082 assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4083 }
4084
4085 #[async_test]
4086 async fn test_well_known_caching() {
4087 let server = MatrixMockServer::new().await;
4088 let server_url = server.uri();
4089 let domain = server_url.strip_prefix("http://").unwrap();
4090 let server_name = <&ServerName>::try_from(domain).unwrap();
4091 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
4092
4093 let well_known_mock = server
4094 .mock_well_known()
4095 .ok()
4096 .named("well known mock")
4097 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
4098 .mount_as_scoped()
4099 .await;
4100
4101 let memory_store = Arc::new(MemoryStore::new());
4102 let client = Client::builder()
4103 .insecure_server_name_no_tls(server_name)
4104 .store_config(
4105 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4106 .state_store(memory_store.clone()),
4107 )
4108 .build()
4109 .await
4110 .unwrap();
4111
4112 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4113
4114 // This subsequent call hits the in-memory cache.
4115 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4116
4117 drop(client);
4118
4119 let client = server
4120 .client_builder()
4121 .no_server_versions()
4122 .on_builder(|builder| {
4123 builder.store_config(
4124 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4125 .state_store(memory_store.clone()),
4126 )
4127 })
4128 .build()
4129 .await;
4130
4131 // This call to the new client hits the on-disk cache.
4132 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4133
4134 // Then this call hits the in-memory cache.
4135 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4136
4137 drop(well_known_mock);
4138
4139 // Now, reset the cache, and observe the endpoints being called again once.
4140 client.reset_well_known().await.unwrap();
4141
4142 server.mock_well_known().ok().named("second well known mock").expect(2).mount().await;
4143
4144 // Hits network again.
4145 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4146 // Hits in-memory cache again.
4147 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4148
4149 // Force an expiry of the data.
4150 let well_known = client.well_known().await;
4151 let mut ttl_value = TtlValue::new(well_known);
4152 ttl_value.expire();
4153 client.inner.caches.well_known.set_value(ttl_value);
4154
4155 // Call the method again to trigger a cache refresh background task.
4156 client.well_known().await;
4157
4158 // We wait for the task to finish, the endpoint should have been called again.
4159 // We need to wait a bit because the first requests using the server name of the
4160 // user will fail, only the requests using the homeserver URL will succeed.
4161 sleep(Duration::from_secs(5)).await;
4162 assert_matches!(client.inner.caches.well_known.value(), CachedValue::Cached(value) if !value.has_expired());
4163 }
4164
4165 #[async_test]
4166 async fn test_missing_well_known_caching() {
4167 let server = MatrixMockServer::new().await;
4168 let rtc_foci: Vec<RtcFocusInfo> = vec![];
4169
4170 let well_known_mock = server
4171 .mock_well_known()
4172 .error_unrecognized()
4173 .named("first well-known mock")
4174 .expect(1)
4175 .mount_as_scoped()
4176 .await;
4177
4178 let memory_store = Arc::new(MemoryStore::new());
4179 let client = server
4180 .client_builder()
4181 .on_builder(|builder| {
4182 builder.store_config(
4183 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4184 .state_store(memory_store.clone()),
4185 )
4186 })
4187 .build()
4188 .await;
4189
4190 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4191
4192 // This subsequent call hits the in-memory cache.
4193 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4194
4195 drop(client);
4196
4197 let client = server
4198 .client_builder()
4199 .on_builder(|builder| {
4200 builder.store_config(
4201 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4202 .state_store(memory_store.clone()),
4203 )
4204 })
4205 .build()
4206 .await;
4207
4208 // This call to the new client hits the on-disk cache.
4209 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4210
4211 // Then this call hits the in-memory cache.
4212 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4213
4214 drop(well_known_mock);
4215
4216 // Now, reset the cache, and observe the endpoints being called again once.
4217 client.reset_well_known().await.unwrap();
4218
4219 server
4220 .mock_well_known()
4221 .error_unrecognized()
4222 .expect(1)
4223 .named("second well-known mock")
4224 .mount()
4225 .await;
4226
4227 // Hits network again.
4228 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4229 // Hits in-memory cache again.
4230 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4231 }
4232
4233 #[async_test]
4234 async fn test_no_network_doesnt_cause_infinite_retries() {
4235 // We want infinite retries for transient errors.
4236 let client = MockClientBuilder::new(None)
4237 .on_builder(|builder| builder.request_config(RequestConfig::new()))
4238 .build()
4239 .await;
4240
4241 // We don't define a mock server on purpose here, so that the error is really a
4242 // network error.
4243 client.whoami().await.unwrap_err();
4244 }
4245
4246 #[async_test]
4247 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4248 let server = MatrixMockServer::new().await;
4249 let client = server.client_builder().build().await;
4250
4251 let room_id = room_id!("!room:example.org");
4252
4253 server
4254 .mock_sync()
4255 .ok_and_run(&client, |builder| {
4256 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4257 })
4258 .await;
4259
4260 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4261 assert_eq!(room.room_id(), room_id);
4262 }
4263
4264 #[async_test]
4265 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4266 let server = MatrixMockServer::new().await;
4267 let client = server.client_builder().build().await;
4268
4269 let room_id = room_id!("!room:example.org");
4270
4271 let client = Arc::new(client);
4272
4273 // Perform the /sync request with a delay so it starts after the
4274 // `await_room_remote_echo` call has happened
4275 spawn({
4276 let client = client.clone();
4277 async move {
4278 sleep(Duration::from_millis(100)).await;
4279
4280 server
4281 .mock_sync()
4282 .ok_and_run(&client, |builder| {
4283 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4284 })
4285 .await;
4286 }
4287 });
4288
4289 let room =
4290 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4291 assert_eq!(room.room_id(), room_id);
4292 }
4293
4294 #[async_test]
4295 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4296 let client = MockClientBuilder::new(None).build().await;
4297
4298 let room_id = room_id!("!room:example.org");
4299 // Room is not present so the client won't be able to find it. The call will
4300 // timeout.
4301 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4302 }
4303
4304 #[async_test]
4305 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4306 let server = MatrixMockServer::new().await;
4307 let client = server.client_builder().build().await;
4308
4309 server.mock_create_room().ok().mount().await;
4310
4311 // Create a room in the internal store
4312 let room = client
4313 .create_room(assign!(CreateRoomRequest::new(), {
4314 invite: vec![],
4315 is_direct: false,
4316 }))
4317 .await
4318 .unwrap();
4319
4320 // Room is locally present, but not synced, the call will timeout
4321 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4322 .await
4323 .unwrap_err();
4324 }
4325
4326 #[async_test]
4327 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4328 let server = MatrixMockServer::new().await;
4329 let client = server.client_builder().build().await;
4330
4331 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4332
4333 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4334 assert_matches!(ret, Ok(true));
4335 }
4336
4337 #[async_test]
4338 async fn test_is_room_alias_available_if_alias_is_resolved() {
4339 let server = MatrixMockServer::new().await;
4340 let client = server.client_builder().build().await;
4341
4342 server
4343 .mock_room_directory_resolve_alias()
4344 .ok("!some_room_id:matrix.org", Vec::new())
4345 .expect(1)
4346 .mount()
4347 .await;
4348
4349 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4350 assert_matches!(ret, Ok(false));
4351 }
4352
4353 #[async_test]
4354 async fn test_is_room_alias_available_if_error_found() {
4355 let server = MatrixMockServer::new().await;
4356 let client = server.client_builder().build().await;
4357
4358 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4359
4360 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4361 assert_matches!(ret, Err(_));
4362 }
4363
4364 #[async_test]
4365 async fn test_create_room_alias() {
4366 let server = MatrixMockServer::new().await;
4367 let client = server.client_builder().build().await;
4368
4369 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4370
4371 let ret = client
4372 .create_room_alias(
4373 room_alias_id!("#some_alias:matrix.org"),
4374 room_id!("!some_room:matrix.org"),
4375 )
4376 .await;
4377 assert_matches!(ret, Ok(()));
4378 }
4379
4380 #[async_test]
4381 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4382 let server = MatrixMockServer::new().await;
4383 let client = server.client_builder().build().await;
4384
4385 let room_id = room_id!("!a-room:matrix.org");
4386
4387 // Make sure the summary endpoint is called once
4388 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4389
4390 // We create a locally cached invited room
4391 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4392
4393 // And we get a preview, the server endpoint was reached
4394 let preview = client
4395 .get_room_preview(room_id.into(), Vec::new())
4396 .await
4397 .expect("Room preview should be retrieved");
4398
4399 assert_eq!(invited_room.room_id(), preview.room_id);
4400 }
4401
4402 #[async_test]
4403 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4404 let server = MatrixMockServer::new().await;
4405 let client = server.client_builder().build().await;
4406
4407 let room_id = room_id!("!a-room:matrix.org");
4408
4409 // Make sure the summary endpoint is called once
4410 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4411
4412 // We create a locally cached left room
4413 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4414
4415 // And we get a preview, the server endpoint was reached
4416 let preview = client
4417 .get_room_preview(room_id.into(), Vec::new())
4418 .await
4419 .expect("Room preview should be retrieved");
4420
4421 assert_eq!(left_room.room_id(), preview.room_id);
4422 }
4423
4424 #[async_test]
4425 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4426 let server = MatrixMockServer::new().await;
4427 let client = server.client_builder().build().await;
4428
4429 let room_id = room_id!("!a-room:matrix.org");
4430
4431 // Make sure the summary endpoint is called once
4432 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4433
4434 // We create a locally cached knocked room
4435 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4436
4437 // And we get a preview, the server endpoint was reached
4438 let preview = client
4439 .get_room_preview(room_id.into(), Vec::new())
4440 .await
4441 .expect("Room preview should be retrieved");
4442
4443 assert_eq!(knocked_room.room_id(), preview.room_id);
4444 }
4445
4446 #[async_test]
4447 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4448 let server = MatrixMockServer::new().await;
4449 let client = server.client_builder().build().await;
4450
4451 let room_id = room_id!("!a-room:matrix.org");
4452
4453 // Make sure the summary endpoint is not called
4454 server.mock_room_summary().ok(room_id).never().mount().await;
4455
4456 // We create a locally cached joined room
4457 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4458
4459 // And we get a preview, no server endpoint was reached
4460 let preview = client
4461 .get_room_preview(room_id.into(), Vec::new())
4462 .await
4463 .expect("Room preview should be retrieved");
4464
4465 assert_eq!(joined_room.room_id(), preview.room_id);
4466 }
4467
4468 #[async_test]
4469 async fn test_media_preview_config() {
4470 let server = MatrixMockServer::new().await;
4471 let client = server.client_builder().build().await;
4472
4473 server
4474 .mock_sync()
4475 .ok_and_run(&client, |builder| {
4476 builder.add_custom_global_account_data(json!({
4477 "content": {
4478 "media_previews": "private",
4479 "invite_avatars": "off"
4480 },
4481 "type": "m.media_preview_config"
4482 }));
4483 })
4484 .await;
4485
4486 let (initial_value, stream) =
4487 client.account().observe_media_preview_config().await.unwrap();
4488
4489 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4490 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4491 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4492 pin_mut!(stream);
4493 assert_pending!(stream);
4494
4495 server
4496 .mock_sync()
4497 .ok_and_run(&client, |builder| {
4498 builder.add_custom_global_account_data(json!({
4499 "content": {
4500 "media_previews": "off",
4501 "invite_avatars": "on"
4502 },
4503 "type": "m.media_preview_config"
4504 }));
4505 })
4506 .await;
4507
4508 assert_next_matches!(
4509 stream,
4510 MediaPreviewConfigEventContent {
4511 media_previews: Some(MediaPreviews::Off),
4512 invite_avatars: Some(InviteAvatars::On),
4513 ..
4514 }
4515 );
4516 assert_pending!(stream);
4517 }
4518
4519 #[async_test]
4520 async fn test_unstable_media_preview_config() {
4521 let server = MatrixMockServer::new().await;
4522 let client = server.client_builder().build().await;
4523
4524 server
4525 .mock_sync()
4526 .ok_and_run(&client, |builder| {
4527 builder.add_custom_global_account_data(json!({
4528 "content": {
4529 "media_previews": "private",
4530 "invite_avatars": "off"
4531 },
4532 "type": "io.element.msc4278.media_preview_config"
4533 }));
4534 })
4535 .await;
4536
4537 let (initial_value, stream) =
4538 client.account().observe_media_preview_config().await.unwrap();
4539
4540 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4541 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4542 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4543 pin_mut!(stream);
4544 assert_pending!(stream);
4545
4546 server
4547 .mock_sync()
4548 .ok_and_run(&client, |builder| {
4549 builder.add_custom_global_account_data(json!({
4550 "content": {
4551 "media_previews": "off",
4552 "invite_avatars": "on"
4553 },
4554 "type": "io.element.msc4278.media_preview_config"
4555 }));
4556 })
4557 .await;
4558
4559 assert_next_matches!(
4560 stream,
4561 MediaPreviewConfigEventContent {
4562 media_previews: Some(MediaPreviews::Off),
4563 invite_avatars: Some(InviteAvatars::On),
4564 ..
4565 }
4566 );
4567 assert_pending!(stream);
4568 }
4569
4570 #[async_test]
4571 async fn test_media_preview_config_not_found() {
4572 let server = MatrixMockServer::new().await;
4573 let client = server.client_builder().build().await;
4574
4575 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4576
4577 assert!(initial_value.is_none());
4578 }
4579
4580 #[async_test]
4581 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4582 // The default Matrix version we use is 1.11 or higher, so authenticated media
4583 // is supported.
4584 let server = MatrixMockServer::new().await;
4585 let client = server.client_builder().build().await;
4586
4587 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4588
4589 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4590 client.load_or_fetch_max_upload_size().await.unwrap();
4591
4592 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4593 }
4594
4595 #[async_test]
4596 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4597 // The server must advertise support for the stable feature for authenticated
4598 // media support, so we mock the `GET /versions` response.
4599 let server = MatrixMockServer::new().await;
4600 let client = server.client_builder().no_server_versions().build().await;
4601
4602 server
4603 .mock_versions()
4604 .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4605 .with_feature("org.matrix.msc3916.stable", true)
4606 .ok()
4607 .named("versions")
4608 .expect(1)
4609 .mount()
4610 .await;
4611
4612 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4613
4614 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4615 client.load_or_fetch_max_upload_size().await.unwrap();
4616
4617 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4618 }
4619
4620 #[async_test]
4621 async fn test_load_or_fetch_max_upload_size_no_auth() {
4622 // The server must not support Matrix 1.11 or higher for unauthenticated
4623 // media requests, so we mock the `GET /versions` response.
4624 let server = MatrixMockServer::new().await;
4625 let client = server.client_builder().no_server_versions().build().await;
4626
4627 server
4628 .mock_versions()
4629 .with_versions(vec!["v1.1"])
4630 .ok()
4631 .named("versions")
4632 .expect(1)
4633 .mount()
4634 .await;
4635
4636 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4637
4638 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4639 client.load_or_fetch_max_upload_size().await.unwrap();
4640
4641 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4642 }
4643
4644 #[async_test]
4645 async fn test_uploading_a_too_large_media_file() {
4646 let server = MatrixMockServer::new().await;
4647 let client = server.client_builder().build().await;
4648
4649 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4650 client.load_or_fetch_max_upload_size().await.unwrap();
4651 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4652
4653 let data = vec![1, 2];
4654 let upload_request =
4655 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4656 let request = SendRequest {
4657 client: client.clone(),
4658 request: upload_request,
4659 config: None,
4660 send_progress: SharedObservable::new(TransmissionProgress::default()),
4661 };
4662 let media_request = SendMediaUploadRequest::new(request);
4663
4664 let error = media_request.await.err();
4665 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4666 assert_eq!(max, uint!(1));
4667 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4668 }
4669
4670 #[async_test]
4671 async fn test_dont_ignore_timeout_on_first_sync() {
4672 let server = MatrixMockServer::new().await;
4673 let client = server.client_builder().build().await;
4674
4675 server
4676 .mock_sync()
4677 .timeout(Some(Duration::from_secs(30)))
4678 .ok(|_| {})
4679 .mock_once()
4680 .named("sync_with_timeout")
4681 .mount()
4682 .await;
4683
4684 // Call the endpoint once to check the timeout.
4685 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4686
4687 timeout(Duration::from_secs(1), async {
4688 stream.next().await.unwrap().unwrap();
4689 })
4690 .await
4691 .unwrap();
4692 }
4693
4694 #[async_test]
4695 async fn test_ignore_timeout_on_first_sync() {
4696 let server = MatrixMockServer::new().await;
4697 let client = server.client_builder().build().await;
4698
4699 server
4700 .mock_sync()
4701 .timeout(None)
4702 .ok(|_| {})
4703 .mock_once()
4704 .named("sync_no_timeout")
4705 .mount()
4706 .await;
4707 server
4708 .mock_sync()
4709 .timeout(Some(Duration::from_secs(30)))
4710 .ok(|_| {})
4711 .mock_once()
4712 .named("sync_with_timeout")
4713 .mount()
4714 .await;
4715
4716 // Call each version of the endpoint once to check the timeouts.
4717 let mut stream = Box::pin(
4718 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4719 );
4720
4721 timeout(Duration::from_secs(1), async {
4722 stream.next().await.unwrap().unwrap();
4723 stream.next().await.unwrap().unwrap();
4724 })
4725 .await
4726 .unwrap();
4727 }
4728
4729 #[async_test]
4730 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4731 let server = MatrixMockServer::new().await;
4732 let client = server.client_builder().build().await;
4733 // This is the user ID that is inside MemberAdditional.
4734 // Note the confusing username, so we can share
4735 // GlobalAccountDataTestEvent::Direct with the invited test.
4736 let user_id = user_id!("@invited:localhost");
4737
4738 // When we receive a sync response saying "invited" is invited to a DM
4739 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4740 let response = SyncResponseBuilder::default()
4741 .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4742 .add_global_account_data(
4743 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4744 )
4745 .build_sync_response();
4746 client.base_client().receive_sync_response(response).await.unwrap();
4747
4748 // Then get_dm_room finds this room
4749 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4750 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4751 }
4752
4753 #[async_test]
4754 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4755 let server = MatrixMockServer::new().await;
4756 let client = server.client_builder().build().await;
4757 // This is the user ID that is inside MemberInvite
4758 let user_id = user_id!("@invited:localhost");
4759
4760 // When we receive a sync response saying "invited" is invited to a DM
4761 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4762 let response = SyncResponseBuilder::default()
4763 .add_joined_room(
4764 JoinedRoomBuilder::default()
4765 .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4766 )
4767 .add_global_account_data(
4768 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4769 )
4770 .build_sync_response();
4771 client.base_client().receive_sync_response(response).await.unwrap();
4772
4773 // Then get_dm_room finds this room
4774 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4775 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4776 }
4777
4778 #[async_test]
4779 async fn test_get_dm_room_still_finds_left_room() {
4780 // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4781 // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4782
4783 let server = MatrixMockServer::new().await;
4784 let client = server.client_builder().build().await;
4785 // This is the user ID that is inside MemberAdditional.
4786 // Note the confusing username, so we can share
4787 // GlobalAccountDataTestEvent::Direct with the invited test.
4788 let user_id = user_id!("@invited:localhost");
4789
4790 // When we receive a sync response saying "invited" has left a DM
4791 let f = EventFactory::new().sender(user_id);
4792 let response = SyncResponseBuilder::default()
4793 .add_joined_room(
4794 JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4795 )
4796 .add_global_account_data(
4797 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4798 )
4799 .build_sync_response();
4800 client.base_client().receive_sync_response(response).await.unwrap();
4801
4802 // Then get_dm_room finds this room
4803 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4804 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4805 }
4806
4807 #[async_test]
4808 async fn test_device_exists() {
4809 let server = MatrixMockServer::new().await;
4810 let client = server.client_builder().build().await;
4811
4812 server.mock_get_device().ok().expect(1).mount().await;
4813
4814 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4815 }
4816
4817 #[async_test]
4818 async fn test_device_exists_404() {
4819 let server = MatrixMockServer::new().await;
4820 let client = server.client_builder().build().await;
4821
4822 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4823 }
4824
4825 #[async_test]
4826 async fn test_device_exists_500() {
4827 let server = MatrixMockServer::new().await;
4828 let client = server.client_builder().build().await;
4829
4830 server.mock_get_device().error500().expect(1).mount().await;
4831
4832 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4833 }
4834
4835 #[async_test]
4836 async fn test_fetching_well_known_with_homeserver_url() {
4837 let server = MatrixMockServer::new().await;
4838 let client = server.client_builder().build().await;
4839 server.mock_well_known().ok().mount().await;
4840
4841 assert_matches!(client.fetch_client_well_known().await, Some(_));
4842 }
4843
4844 #[async_test]
4845 async fn test_fetching_well_known_with_server_name() {
4846 let server = MatrixMockServer::new().await;
4847 let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4848
4849 server.mock_well_known().ok().mount().await;
4850
4851 let client = MockClientBuilder::new(None)
4852 .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4853 .build()
4854 .await;
4855
4856 assert_matches!(client.fetch_client_well_known().await, Some(_));
4857 }
4858
4859 #[async_test]
4860 async fn test_fetching_well_known_with_domain_part_of_user_id() {
4861 let server = MatrixMockServer::new().await;
4862 server.mock_well_known().ok().mount().await;
4863
4864 let user_id =
4865 UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4866 let client = MockClientBuilder::new(None)
4867 .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4868 .build()
4869 .await;
4870
4871 assert_matches!(client.fetch_client_well_known().await, Some(_));
4872 }
4873}