matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32 DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35 BaseClient, DmRoomDefinition, RoomInfoNotableUpdate, RoomState, RoomStateFilter,
36 SendOutsideWasm, SessionMeta, StateStoreDataKey, StateStoreDataValue, StoreError,
37 SyncOutsideWasm, ThreadingSupport,
38 event_cache::store::EventCacheStoreLock,
39 media::store::MediaStoreLock,
40 store::{DynStateStore, RoomLoadSettings, SupportedVersionsResponse, WellKnownResponse},
41 sync::{Notification, RoomUpdates},
42 task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl::TtlValue};
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50 api::{
51 FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52 client::{
53 account::whoami,
54 alias::{create_alias, delete_alias, get_alias},
55 authenticated_media,
56 device::{self, delete_devices, get_devices, update_device},
57 directory::{get_public_rooms, get_public_rooms_filtered},
58 discovery::{
59 discover_homeserver::{self, RtcFocusInfo},
60 get_supported_versions,
61 },
62 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
63 knock::knock_room,
64 media,
65 membership::{join_room_by_id, join_room_by_id_or_alias},
66 room::create_room,
67 session::login::v3::DiscoveryInfo,
68 sync::sync_events,
69 threads::get_thread_subscriptions_changes,
70 uiaa,
71 user_directory::search_users,
72 },
73 error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
74 path_builder::PathBuilder,
75 },
76 assign,
77 events::{beacon_info::OriginalSyncBeaconInfoEvent, direct::DirectUserIdentifier},
78 push::Ruleset,
79 time::Instant,
80};
81use serde::de::DeserializeOwned;
82use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
83use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
84use url::Url;
85
86use self::{
87 caches::{Cache, CachedValue, ClientCaches},
88 futures::SendRequest,
89};
90use crate::{
91 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
92 Room, SessionTokens, TransmissionProgress,
93 authentication::{
94 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
95 oauth::OAuth,
96 },
97 client::{
98 homeserver_capabilities::HomeserverCapabilities,
99 thread_subscriptions::ThreadSubscriptionCatchup,
100 },
101 config::{RequestConfig, SyncToken},
102 deduplicating_handler::DeduplicatingHandler,
103 error::HttpResult,
104 event_cache::EventCache,
105 event_handler::{
106 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
107 EventHandlerStore, ObservableEventHandler, SyncEvent,
108 },
109 http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
110 latest_events::LatestEvents,
111 live_locations_observer::BeaconInfoUpdate,
112 media::MediaError,
113 notification_settings::NotificationSettings,
114 room::RoomMember,
115 room_preview::RoomPreview,
116 send_queue::{SendQueue, SendQueueData},
117 sliding_sync::Version as SlidingSyncVersion,
118 sync::{RoomUpdate, SyncResponse},
119};
120#[cfg(feature = "e2e-encryption")]
121use crate::{
122 cross_process_lock::CrossProcessLock,
123 encryption::{
124 DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
125 VerificationState,
126 },
127};
128
129mod builder;
130pub(crate) mod caches;
131pub(crate) mod futures;
132pub(crate) mod homeserver_capabilities;
133pub(crate) mod thread_subscriptions;
134
135pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
136#[cfg(feature = "experimental-search")]
137use crate::search_index::SearchIndex;
138
139#[cfg(not(target_family = "wasm"))]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
141#[cfg(target_family = "wasm")]
142type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
143
144#[cfg(not(target_family = "wasm"))]
145type NotificationHandlerFn =
146 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
147#[cfg(target_family = "wasm")]
148type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
149
150/// Enum controlling if a loop running callbacks should continue or abort.
151///
152/// This is mainly used in the [`sync_with_callback`] method, the return value
153/// of the provided callback controls if the sync loop should be exited.
154///
155/// [`sync_with_callback`]: #method.sync_with_callback
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum LoopCtrl {
158 /// Continue running the loop.
159 Continue,
160 /// Break out of the loop.
161 Break,
162}
163
164/// Represents changes that can occur to a `Client`s `Session`.
165#[derive(Debug, Clone, PartialEq)]
166pub enum SessionChange {
167 /// The session's token is no longer valid.
168 UnknownToken(UnknownTokenErrorData),
169 /// The session's tokens have been refreshed.
170 TokensRefreshed,
171}
172
173/// Information about the server vendor obtained from the federation API.
174#[derive(Debug, Clone, PartialEq, Eq)]
175#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
176pub struct ServerVendorInfo {
177 /// The server name.
178 pub server_name: String,
179 /// The server version.
180 pub version: String,
181}
182
183/// An async/await enabled Matrix client.
184///
185/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
186#[derive(Clone)]
187pub struct Client {
188 pub(crate) inner: Arc<ClientInner>,
189}
190
191#[derive(Default)]
192pub(crate) struct ClientLocks {
193 /// Lock ensuring that only a single room may be marked as a DM at once.
194 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
195 /// explanation.
196 pub(crate) mark_as_dm_lock: Mutex<()>,
197
198 /// Lock ensuring that only a single secret store is getting opened at the
199 /// same time.
200 ///
201 /// This is important so we don't accidentally create multiple different new
202 /// default secret storage keys.
203 #[cfg(feature = "e2e-encryption")]
204 pub(crate) open_secret_store_lock: Mutex<()>,
205
206 /// Lock ensuring that we're only storing a single secret at a time.
207 ///
208 /// Take a look at the [`SecretStore::put_secret`] method for a more
209 /// detailed explanation.
210 ///
211 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
212 #[cfg(feature = "e2e-encryption")]
213 pub(crate) store_secret_lock: Mutex<()>,
214
215 /// Lock ensuring that only one method at a time might modify our backup.
216 #[cfg(feature = "e2e-encryption")]
217 pub(crate) backup_modify_lock: Mutex<()>,
218
219 /// Lock ensuring that we're going to attempt to upload backups for a single
220 /// requester.
221 #[cfg(feature = "e2e-encryption")]
222 pub(crate) backup_upload_lock: Mutex<()>,
223
224 /// Handler making sure we only have one group session sharing request in
225 /// flight per room.
226 #[cfg(feature = "e2e-encryption")]
227 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
228
229 /// Lock making sure we're only doing one key claim request at a time.
230 #[cfg(feature = "e2e-encryption")]
231 pub(crate) key_claim_lock: Mutex<()>,
232
233 /// Handler to ensure that only one members request is running at a time,
234 /// given a room.
235 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
236
237 /// Handler to ensure that only one encryption state request is running at a
238 /// time, given a room.
239 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
240
241 /// Deduplicating handler for sending read receipts. The string is an
242 /// internal implementation detail, see [`Self::send_single_receipt`].
243 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
244
245 #[cfg(feature = "e2e-encryption")]
246 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
247
248 /// Latest "generation" of data known by the crypto store.
249 ///
250 /// This is a counter that only increments, set in the database (and can
251 /// wrap). It's incremented whenever some process acquires a lock for the
252 /// first time. *This assumes the crypto store lock is being held, to
253 /// avoid data races on writing to this value in the store*.
254 ///
255 /// The current process will maintain this value in local memory and in the
256 /// DB over time. Observing a different value than the one read in
257 /// memory, when reading from the store indicates that somebody else has
258 /// written into the database under our feet.
259 ///
260 /// TODO: this should live in the `OlmMachine`, since it's information
261 /// related to the lock. As of today (2023-07-28), we blow up the entire
262 /// olm machine when there's a generation mismatch. So storing the
263 /// generation in the olm machine would make the client think there's
264 /// *always* a mismatch, and that's why we need to store the generation
265 /// outside the `OlmMachine`.
266 #[cfg(feature = "e2e-encryption")]
267 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
268}
269
270pub(crate) struct ClientInner {
271 /// All the data related to authentication and authorization.
272 pub(crate) auth_ctx: Arc<AuthCtx>,
273
274 /// The URL of the server.
275 ///
276 /// Not to be confused with the `Self::homeserver`. `server` is usually
277 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
278 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
279 /// homeserver (at the time of writing — 2024-08-28).
280 ///
281 /// This value is optional depending on how the `Client` has been built.
282 /// If it's been built from a homeserver URL directly, we don't know the
283 /// server. However, if the `Client` has been built from a server URL or
284 /// name, then the homeserver has been discovered, and we know both.
285 server: Option<Url>,
286
287 /// The URL of the homeserver to connect to.
288 ///
289 /// This is the URL for the client-server Matrix API.
290 homeserver: StdRwLock<Url>,
291
292 /// The sliding sync version.
293 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
294
295 /// The underlying HTTP client.
296 pub(crate) http_client: HttpClient,
297
298 /// User session data.
299 pub(super) base_client: BaseClient,
300
301 /// Collection of in-memory caches for the [`Client`].
302 pub(crate) caches: ClientCaches,
303
304 /// Collection of locks individual client methods might want to use, either
305 /// to ensure that only a single call to a method happens at once or to
306 /// deduplicate multiple calls to a method.
307 pub(crate) locks: ClientLocks,
308
309 /// The cross-process lock configuration.
310 ///
311 /// The SDK provides cross-process store locks (see
312 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
313 /// [`CrossProcessLockConfig::MultiProcess`] is used.
314 ///
315 /// If multiple `Client`s are running in different processes, this
316 /// value MUST be different for each `Client`.
317 cross_process_lock_config: CrossProcessLockConfig,
318
319 /// A mapping of the times at which the current user sent typing notices,
320 /// keyed by room.
321 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
322
323 /// Event handlers. See `add_event_handler`.
324 pub(crate) event_handlers: EventHandlerStore,
325
326 /// Notification handlers. See `register_notification_handler`.
327 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
328
329 /// The sender-side of channels used to receive room updates.
330 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
331
332 /// The sender-side of a channel used to observe all the room updates of a
333 /// sync response.
334 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
335
336 /// Whether the client should update its homeserver URL with the discovery
337 /// information present in the login response.
338 respect_login_well_known: bool,
339
340 /// An event that can be listened on to wait for a successful sync. The
341 /// event will only be fired if a sync loop is running. Can be used for
342 /// synchronization, e.g. if we send out a request to create a room, we can
343 /// wait for the sync to get the data to fetch a room object from the state
344 /// store.
345 pub(crate) sync_beat: event_listener::Event,
346
347 /// A central cache for events, inactive first.
348 ///
349 /// It becomes active when [`EventCache::subscribe`] is called.
350 pub(crate) event_cache: OnceCell<EventCache>,
351
352 /// End-to-end encryption related state.
353 #[cfg(feature = "e2e-encryption")]
354 pub(crate) e2ee: EncryptionData,
355
356 /// The verification state of our own device.
357 #[cfg(feature = "e2e-encryption")]
358 pub(crate) verification_state: SharedObservable<VerificationState>,
359
360 /// Whether to enable the experimental support for sending and receiving
361 /// encrypted room history on invite, per [MSC4268].
362 ///
363 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
364 #[cfg(feature = "e2e-encryption")]
365 pub(crate) enable_share_history_on_invite: bool,
366
367 /// Data related to the [`SendQueue`].
368 ///
369 /// [`SendQueue`]: crate::send_queue::SendQueue
370 pub(crate) send_queue_data: Arc<SendQueueData>,
371
372 /// The `max_upload_size` value of the homeserver, it contains the max
373 /// request size you can send.
374 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
375
376 /// The entry point to get the [`LatestEvent`] of rooms and threads.
377 ///
378 /// [`LatestEvent`]: crate::latest_event::LatestEvent
379 latest_events: OnceCell<LatestEvents>,
380
381 /// Service handling the catching up of thread subscriptions in the
382 /// background.
383 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
384
385 #[cfg(feature = "experimental-search")]
386 /// Handler for [`RoomIndex`]'s of each room
387 search_index: SearchIndex,
388
389 /// A monitor for background tasks spawned by the client.
390 pub(crate) task_monitor: TaskMonitor,
391
392 /// A sender to notify subscribers about duplicate key upload errors
393 /// triggered by requests to /keys/upload.
394 #[cfg(feature = "e2e-encryption")]
395 pub(crate) duplicate_key_upload_error_sender:
396 broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
397}
398
399impl ClientInner {
400 /// Create a new `ClientInner`.
401 ///
402 /// All the fields passed as parameters here are those that must be cloned
403 /// upon instantiation of a sub-client, e.g. a client specialized for
404 /// notifications.
405 #[allow(clippy::too_many_arguments)]
406 async fn new(
407 auth_ctx: Arc<AuthCtx>,
408 server: Option<Url>,
409 homeserver: Url,
410 sliding_sync_version: SlidingSyncVersion,
411 http_client: HttpClient,
412 base_client: BaseClient,
413 supported_versions: CachedValue<TtlValue<SupportedVersions>>,
414 well_known: CachedValue<TtlValue<Option<WellKnownResponse>>>,
415 respect_login_well_known: bool,
416 event_cache: OnceCell<EventCache>,
417 send_queue: Arc<SendQueueData>,
418 latest_events: OnceCell<LatestEvents>,
419 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
420 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
421 cross_process_lock_config: CrossProcessLockConfig,
422 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
423 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
424 ) -> Arc<Self> {
425 let caches = ClientCaches {
426 supported_versions: Cache::with_value(supported_versions),
427 well_known: Cache::with_value(well_known),
428 server_metadata: Cache::new(),
429 homeserver_capabilities: Cache::new(),
430 };
431
432 let client = Self {
433 server,
434 homeserver: StdRwLock::new(homeserver),
435 auth_ctx,
436 sliding_sync_version: StdRwLock::new(sliding_sync_version),
437 http_client,
438 base_client,
439 caches,
440 locks: Default::default(),
441 cross_process_lock_config,
442 typing_notice_times: Default::default(),
443 event_handlers: Default::default(),
444 notification_handlers: Default::default(),
445 room_update_channels: Default::default(),
446 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
447 // ballast for all observers to catch up.
448 room_updates_sender: broadcast::Sender::new(32),
449 respect_login_well_known,
450 sync_beat: event_listener::Event::new(),
451 event_cache,
452 send_queue_data: send_queue,
453 latest_events,
454 #[cfg(feature = "e2e-encryption")]
455 e2ee: EncryptionData::new(encryption_settings),
456 #[cfg(feature = "e2e-encryption")]
457 verification_state: SharedObservable::new(VerificationState::Unknown),
458 #[cfg(feature = "e2e-encryption")]
459 enable_share_history_on_invite,
460 server_max_upload_size: Mutex::new(OnceCell::new()),
461 #[cfg(feature = "experimental-search")]
462 search_index: search_index_handler,
463 thread_subscription_catchup,
464 task_monitor: TaskMonitor::new(),
465 #[cfg(feature = "e2e-encryption")]
466 duplicate_key_upload_error_sender: broadcast::channel(1).0,
467 };
468
469 #[allow(clippy::let_and_return)]
470 let client = Arc::new(client);
471
472 #[cfg(feature = "e2e-encryption")]
473 client.e2ee.initialize_tasks(&client);
474
475 let _ = client
476 .event_cache
477 .get_or_init(|| async {
478 EventCache::new(&client, client.base_client.event_cache_store().clone())
479 })
480 .await;
481
482 let _ = client
483 .thread_subscription_catchup
484 .get_or_init(|| async {
485 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
486 })
487 .await;
488
489 client
490 }
491}
492
493#[cfg(not(tarpaulin_include))]
494impl Debug for Client {
495 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
496 write!(fmt, "Client")
497 }
498}
499
500impl Client {
501 /// Create a new [`Client`] that will use the given homeserver.
502 ///
503 /// # Arguments
504 ///
505 /// * `homeserver_url` - The homeserver that the client should connect to.
506 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
507 Self::builder().homeserver_url(homeserver_url).build().await
508 }
509
510 /// Returns a subscriber that publishes an event every time the ignore user
511 /// list changes.
512 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
513 self.inner.base_client.subscribe_to_ignore_user_list_changes()
514 }
515
516 /// Create a new [`ClientBuilder`].
517 pub fn builder() -> ClientBuilder {
518 ClientBuilder::new()
519 }
520
521 pub(crate) fn base_client(&self) -> &BaseClient {
522 &self.inner.base_client
523 }
524
525 /// The underlying HTTP client.
526 pub fn http_client(&self) -> &reqwest::Client {
527 &self.inner.http_client.inner
528 }
529
530 pub(crate) fn locks(&self) -> &ClientLocks {
531 &self.inner.locks
532 }
533
534 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
535 &self.inner.auth_ctx
536 }
537
538 /// The cross-process store lock configuration used by this [`Client`].
539 ///
540 /// The SDK provides cross-process store locks (see
541 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
542 /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
543 /// the value used for all cross-process store locks used by this
544 /// `Client`.
545 pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
546 &self.inner.cross_process_lock_config
547 }
548
549 /// Change the homeserver URL used by this client.
550 ///
551 /// # Arguments
552 ///
553 /// * `homeserver_url` - The new URL to use.
554 fn set_homeserver(&self, homeserver_url: Url) {
555 *self.inner.homeserver.write().unwrap() = homeserver_url;
556 }
557
558 /// Change to a different homeserver and re-resolve well-known.
559 #[cfg(feature = "e2e-encryption")]
560 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
561 &self,
562 homeserver_url: Url,
563 ) -> Result<()> {
564 self.set_homeserver(homeserver_url);
565 self.reset_well_known().await?;
566 if let Some(well_known) = self.well_known().await {
567 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
568 }
569 Ok(())
570 }
571
572 /// Retrieves a helper component to access the [`HomeserverCapabilities`]
573 /// supported or disabled by the homeserver.
574 pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
575 HomeserverCapabilities::new(self.clone())
576 }
577
578 /// Get the server vendor information from the federation API.
579 ///
580 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
581 /// both the server's software name and version.
582 ///
583 /// # Examples
584 ///
585 /// ```no_run
586 /// # use matrix_sdk::Client;
587 /// # use url::Url;
588 /// # async {
589 /// # let homeserver = Url::parse("http://example.com")?;
590 /// let client = Client::new(homeserver).await?;
591 ///
592 /// let server_info = client.server_vendor_info(None).await?;
593 /// println!(
594 /// "Server: {}, Version: {}",
595 /// server_info.server_name, server_info.version
596 /// );
597 /// # anyhow::Ok(()) };
598 /// ```
599 #[cfg(feature = "federation-api")]
600 pub async fn server_vendor_info(
601 &self,
602 request_config: Option<RequestConfig>,
603 ) -> HttpResult<ServerVendorInfo> {
604 use ruma::api::federation::discovery::get_server_version;
605
606 let res = self
607 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
608 .await?;
609
610 // Extract server info, using defaults if fields are missing.
611 let server = res.server.unwrap_or_default();
612 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
613 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
614
615 Ok(ServerVendorInfo { server_name: server_name_str, version })
616 }
617
618 /// Get a copy of the default request config.
619 ///
620 /// The default request config is what's used when sending requests if no
621 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
622 /// function with such a parameter.
623 ///
624 /// If the default request config was not customized through
625 /// [`ClientBuilder`] when creating this `Client`, the returned value will
626 /// be equivalent to [`RequestConfig::default()`].
627 pub fn request_config(&self) -> RequestConfig {
628 self.inner.http_client.request_config
629 }
630
631 /// Check whether the client has been activated.
632 ///
633 /// A client is considered active when:
634 ///
635 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
636 /// is logged in,
637 /// 2. Has loaded cached data from storage,
638 /// 3. If encryption is enabled, it also initialized or restored its
639 /// `OlmMachine`.
640 pub fn is_active(&self) -> bool {
641 self.inner.base_client.is_active()
642 }
643
644 /// The server used by the client.
645 ///
646 /// See `Self::server` to learn more.
647 pub fn server(&self) -> Option<&Url> {
648 self.inner.server.as_ref()
649 }
650
651 /// The homeserver of the client.
652 pub fn homeserver(&self) -> Url {
653 self.inner.homeserver.read().unwrap().clone()
654 }
655
656 /// Get the sliding sync version.
657 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
658 self.inner.sliding_sync_version.read().unwrap().clone()
659 }
660
661 /// Override the sliding sync version.
662 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
663 let mut lock = self.inner.sliding_sync_version.write().unwrap();
664 *lock = version;
665 }
666
667 /// Get the Matrix user session meta information.
668 ///
669 /// If the client is currently logged in, this will return a
670 /// [`SessionMeta`] object which contains the user ID and device ID.
671 /// Otherwise it returns `None`.
672 pub fn session_meta(&self) -> Option<&SessionMeta> {
673 self.base_client().session_meta()
674 }
675
676 /// Returns a receiver that gets events for each room info update. To watch
677 /// for new events, use `receiver.resubscribe()`.
678 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
679 self.base_client().room_info_notable_update_receiver()
680 }
681
682 /// Performs a search for users.
683 /// The search is performed case-insensitively on user IDs and display names
684 ///
685 /// # Arguments
686 ///
687 /// * `search_term` - The search term for the search
688 /// * `limit` - The maximum number of results to return. Defaults to 10.
689 ///
690 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
691 pub async fn search_users(
692 &self,
693 search_term: &str,
694 limit: u64,
695 ) -> HttpResult<search_users::v3::Response> {
696 let mut request = search_users::v3::Request::new(search_term.to_owned());
697
698 if let Some(limit) = UInt::new(limit) {
699 request.limit = limit;
700 }
701
702 self.send(request).await
703 }
704
705 /// Get the user id of the current owner of the client.
706 pub fn user_id(&self) -> Option<&UserId> {
707 self.session_meta().map(|s| s.user_id.as_ref())
708 }
709
710 /// Get the device ID that identifies the current session.
711 pub fn device_id(&self) -> Option<&DeviceId> {
712 self.session_meta().map(|s| s.device_id.as_ref())
713 }
714
715 /// Get the current access token for this session.
716 ///
717 /// Will be `None` if the client has not been logged in.
718 pub fn access_token(&self) -> Option<String> {
719 self.auth_ctx().access_token()
720 }
721
722 /// Get the current tokens for this session.
723 ///
724 /// To be notified of changes in the session tokens, use
725 /// [`Client::subscribe_to_session_changes()`] or
726 /// [`Client::set_session_callbacks()`].
727 ///
728 /// Returns `None` if the client has not been logged in.
729 pub fn session_tokens(&self) -> Option<SessionTokens> {
730 self.auth_ctx().session_tokens()
731 }
732
733 /// Access the authentication API used to log in this client.
734 ///
735 /// Will be `None` if the client has not been logged in.
736 pub fn auth_api(&self) -> Option<AuthApi> {
737 match self.auth_ctx().auth_data.get()? {
738 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
739 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
740 }
741 }
742
743 /// Get the whole session info of this client.
744 ///
745 /// Will be `None` if the client has not been logged in.
746 ///
747 /// Can be used with [`Client::restore_session`] to restore a previously
748 /// logged-in session.
749 pub fn session(&self) -> Option<AuthSession> {
750 match self.auth_api()? {
751 AuthApi::Matrix(api) => api.session().map(Into::into),
752 AuthApi::OAuth(api) => api.full_session().map(Into::into),
753 }
754 }
755
756 /// Get a reference to the state store.
757 pub fn state_store(&self) -> &DynStateStore {
758 self.base_client().state_store()
759 }
760
761 /// Get a reference to the event cache store.
762 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
763 self.base_client().event_cache_store()
764 }
765
766 /// Get a reference to the media store.
767 pub fn media_store(&self) -> &MediaStoreLock {
768 self.base_client().media_store()
769 }
770
771 /// Access the native Matrix authentication API with this client.
772 pub fn matrix_auth(&self) -> MatrixAuth {
773 MatrixAuth::new(self.clone())
774 }
775
776 /// Get the account of the current owner of the client.
777 pub fn account(&self) -> Account {
778 Account::new(self.clone())
779 }
780
781 /// Get the encryption manager of the client.
782 #[cfg(feature = "e2e-encryption")]
783 pub fn encryption(&self) -> Encryption {
784 Encryption::new(self.clone())
785 }
786
787 /// Get the media manager of the client.
788 pub fn media(&self) -> Media {
789 Media::new(self.clone())
790 }
791
792 /// Get the pusher manager of the client.
793 pub fn pusher(&self) -> Pusher {
794 Pusher::new(self.clone())
795 }
796
797 /// Access the OAuth 2.0 API of the client.
798 pub fn oauth(&self) -> OAuth {
799 OAuth::new(self.clone())
800 }
801
802 /// Register a handler for a specific event type.
803 ///
804 /// The handler is a function or closure with one or more arguments. The
805 /// first argument is the event itself. All additional arguments are
806 /// "context" arguments: They have to implement [`EventHandlerContext`].
807 /// This trait is named that way because most of the types implementing it
808 /// give additional context about an event: The room it was in, its raw form
809 /// and other similar things. As two exceptions to this,
810 /// [`Client`] and [`EventHandlerHandle`] also implement the
811 /// `EventHandlerContext` trait so you don't have to clone your client
812 /// into the event handler manually and a handler can decide to remove
813 /// itself.
814 ///
815 /// Some context arguments are not universally applicable. A context
816 /// argument that isn't available for the given event type will result in
817 /// the event handler being skipped and an error being logged. The following
818 /// context argument types are only available for a subset of event types:
819 ///
820 /// * [`Room`] is only available for room-specific events, i.e. not for
821 /// events like global account data events or presence events.
822 ///
823 /// You can provide custom context via
824 /// [`add_event_handler_context`](Client::add_event_handler_context) and
825 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
826 /// into the event handler.
827 ///
828 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
829 ///
830 /// # Examples
831 ///
832 /// ```no_run
833 /// use matrix_sdk::{
834 /// deserialized_responses::EncryptionInfo,
835 /// event_handler::Ctx,
836 /// ruma::{
837 /// events::{
838 /// macros::EventContent,
839 /// push_rules::PushRulesEvent,
840 /// room::{
841 /// message::SyncRoomMessageEvent,
842 /// topic::SyncRoomTopicEvent,
843 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
844 /// },
845 /// },
846 /// push::Action,
847 /// Int, MilliSecondsSinceUnixEpoch,
848 /// },
849 /// Client, Room,
850 /// };
851 /// use serde::{Deserialize, Serialize};
852 ///
853 /// # async fn example(client: Client) {
854 /// client.add_event_handler(
855 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
856 /// // Common usage: Room event plus room and client.
857 /// },
858 /// );
859 /// client.add_event_handler(
860 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
861 /// async move {
862 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
863 /// // unencrypted events and events that were decrypted by the SDK.
864 /// }
865 /// },
866 /// );
867 /// client.add_event_handler(
868 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
869 /// async move {
870 /// // A `Vec<Action>` parameter allows you to know which push actions
871 /// // are applicable for an event. For example, an event with
872 /// // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
873 /// // should be highlighted in the timeline.
874 /// }
875 /// },
876 /// );
877 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
878 /// // You can omit any or all arguments after the first.
879 /// });
880 ///
881 /// // Registering a temporary event handler:
882 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
883 /// /* Event handler */
884 /// });
885 /// client.remove_event_handler(handle);
886 ///
887 /// // Registering custom event handler context:
888 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
889 /// struct MyContext {
890 /// number: usize,
891 /// }
892 /// client.add_event_handler_context(MyContext { number: 5 });
893 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
894 /// // Use the context
895 /// });
896 ///
897 /// // This will handle membership events in joined rooms. Invites are special, see below.
898 /// client.add_event_handler(
899 /// |ev: SyncRoomMemberEvent| async move {},
900 /// );
901 ///
902 /// // To handle state events in invited rooms (including invite membership events),
903 /// // `StrippedRoomMemberEvent` should be used.
904 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
905 /// client.add_event_handler(
906 /// |ev: StrippedRoomMemberEvent| async move {},
907 /// );
908 ///
909 /// // Custom events work exactly the same way, you just need to declare
910 /// // the content struct and use the EventContent derive macro on it.
911 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
912 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
913 /// struct TokenEventContent {
914 /// token: String,
915 /// #[serde(rename = "exp")]
916 /// expires_at: MilliSecondsSinceUnixEpoch,
917 /// }
918 ///
919 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
920 /// todo!("Display the token");
921 /// });
922 ///
923 /// // Event handler closures can also capture local variables.
924 /// // Make sure they are cheap to clone though, because they will be cloned
925 /// // every time the closure is called.
926 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
927 ///
928 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
929 /// println!("Calling the handler with identifier {data}");
930 /// });
931 /// # }
932 /// ```
933 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
934 where
935 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
936 H: EventHandler<Ev, Ctx>,
937 {
938 self.add_event_handler_impl(handler, None)
939 }
940
941 /// Register a handler for a specific room, and event type.
942 ///
943 /// This method works the same way as
944 /// [`add_event_handler`][Self::add_event_handler], except that the handler
945 /// will only be called for events in the room with the specified ID. See
946 /// that method for more details on event handler functions.
947 ///
948 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
949 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
950 /// your use case.
951 pub fn add_room_event_handler<Ev, Ctx, H>(
952 &self,
953 room_id: &RoomId,
954 handler: H,
955 ) -> EventHandlerHandle
956 where
957 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
958 H: EventHandler<Ev, Ctx>,
959 {
960 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
961 }
962
963 /// Observe a specific event type.
964 ///
965 /// `Ev` represents the kind of event that will be observed. `Ctx`
966 /// represents the context that will come with the event. It relies on the
967 /// same mechanism as [`Client::add_event_handler`]. The main difference is
968 /// that it returns an [`ObservableEventHandler`] and doesn't require a
969 /// user-defined closure. It is possible to subscribe to the
970 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
971 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
972 /// Ctx)`.
973 ///
974 /// Be careful that only the most recent value can be observed. Subscribers
975 /// are notified when a new value is sent, but there is no guarantee
976 /// that they will see all values.
977 ///
978 /// # Example
979 ///
980 /// Let's see a classical usage:
981 ///
982 /// ```
983 /// use futures_util::StreamExt as _;
984 /// use matrix_sdk::{
985 /// Client, Room,
986 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
987 /// };
988 ///
989 /// # async fn example(client: Client) -> Option<()> {
990 /// let observer =
991 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
992 ///
993 /// let mut subscriber = observer.subscribe();
994 ///
995 /// let (event, (room, push_actions)) = subscriber.next().await?;
996 /// # Some(())
997 /// # }
998 /// ```
999 ///
1000 /// Now let's see how to get several contexts that can be useful for you:
1001 ///
1002 /// ```
1003 /// use matrix_sdk::{
1004 /// Client, Room,
1005 /// deserialized_responses::EncryptionInfo,
1006 /// ruma::{
1007 /// events::room::{
1008 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1009 /// },
1010 /// push::Action,
1011 /// },
1012 /// };
1013 ///
1014 /// # async fn example(client: Client) {
1015 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1016 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1017 ///
1018 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1019 /// // to distinguish between unencrypted events and events that were decrypted
1020 /// // by the SDK.
1021 /// let _ = client
1022 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1023 /// );
1024 ///
1025 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1026 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1027 /// // should be highlighted in the timeline.
1028 /// let _ =
1029 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1030 ///
1031 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1032 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1033 /// # }
1034 /// ```
1035 ///
1036 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1037 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1038 where
1039 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1040 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1041 {
1042 self.observe_room_events_impl(None)
1043 }
1044
1045 /// Observe a specific room, and event type.
1046 ///
1047 /// This method works the same way as [`Client::observe_events`], except
1048 /// that the observability will only be applied for events in the room with
1049 /// the specified ID. See that method for more details.
1050 ///
1051 /// Be careful that only the most recent value can be observed. Subscribers
1052 /// are notified when a new value is sent, but there is no guarantee
1053 /// that they will see all values.
1054 pub fn observe_room_events<Ev, Ctx>(
1055 &self,
1056 room_id: &RoomId,
1057 ) -> ObservableEventHandler<(Ev, Ctx)>
1058 where
1059 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1060 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1061 {
1062 self.observe_room_events_impl(Some(room_id.to_owned()))
1063 }
1064
1065 /// Shared implementation for `Client::observe_events` and
1066 /// `Client::observe_room_events`.
1067 fn observe_room_events_impl<Ev, Ctx>(
1068 &self,
1069 room_id: Option<OwnedRoomId>,
1070 ) -> ObservableEventHandler<(Ev, Ctx)>
1071 where
1072 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1073 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1074 {
1075 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1076 // new value.
1077 let shared_observable = SharedObservable::new(None);
1078
1079 ObservableEventHandler::new(
1080 shared_observable.clone(),
1081 self.event_handler_drop_guard(self.add_event_handler_impl(
1082 move |event: Ev, context: Ctx| {
1083 shared_observable.set(Some((event, context)));
1084
1085 ready(())
1086 },
1087 room_id,
1088 )),
1089 )
1090 }
1091
1092 /// Subscribe to future `beacon_info` updates for the current user across
1093 /// all rooms.
1094 ///
1095 /// This stream is push-only: it emits only future updates observed during
1096 /// sync processing and does not replay existing state.
1097 pub fn observe_own_beacon_info_updates(
1098 &self,
1099 ) -> Result<impl Stream<Item = BeaconInfoUpdate> + use<>> {
1100 let observer = self.observe_events::<OriginalSyncBeaconInfoEvent, Room>();
1101 let mut stream = observer.subscribe();
1102 let own_user_id = self.user_id().ok_or(Error::AuthenticationRequired)?.to_owned();
1103 Ok(async_stream::stream! {
1104 let _observer = observer;
1105
1106 while let Some((event, room)) = stream.next().await {
1107 if event.state_key != own_user_id {
1108 continue;
1109 }
1110 yield BeaconInfoUpdate {
1111 room_id: room.room_id().to_owned(),
1112 event_id: event.event_id,
1113 content: event.content,
1114 };
1115 }
1116 })
1117 }
1118
1119 /// Remove the event handler associated with the handle.
1120 ///
1121 /// Note that you **must not** call `remove_event_handler` from the
1122 /// non-async part of an event handler, that is:
1123 ///
1124 /// ```ignore
1125 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1126 /// // ⚠ this will cause a deadlock ⚠
1127 /// client.remove_event_handler(handle);
1128 ///
1129 /// async move {
1130 /// // removing the event handler here is fine
1131 /// client.remove_event_handler(handle);
1132 /// }
1133 /// })
1134 /// ```
1135 ///
1136 /// Note also that handlers that remove themselves will still execute with
1137 /// events received in the same sync cycle.
1138 ///
1139 /// # Arguments
1140 ///
1141 /// `handle` - The [`EventHandlerHandle`] that is returned when
1142 /// registering the event handler with [`Client::add_event_handler`].
1143 ///
1144 /// # Examples
1145 ///
1146 /// ```no_run
1147 /// # use url::Url;
1148 /// # use tokio::sync::mpsc;
1149 /// #
1150 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1151 /// #
1152 /// use matrix_sdk::{
1153 /// Client, event_handler::EventHandlerHandle,
1154 /// ruma::events::room::member::SyncRoomMemberEvent,
1155 /// };
1156 /// #
1157 /// # futures_executor::block_on(async {
1158 /// # let client = matrix_sdk::Client::builder()
1159 /// # .homeserver_url(homeserver)
1160 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1161 /// # .build()
1162 /// # .await
1163 /// # .unwrap();
1164 ///
1165 /// client.add_event_handler(
1166 /// |ev: SyncRoomMemberEvent,
1167 /// client: Client,
1168 /// handle: EventHandlerHandle| async move {
1169 /// // Common usage: Check arriving Event is the expected one
1170 /// println!("Expected RoomMemberEvent received!");
1171 /// client.remove_event_handler(handle);
1172 /// },
1173 /// );
1174 /// # });
1175 /// ```
1176 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1177 self.inner.event_handlers.remove(handle);
1178 }
1179
1180 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1181 /// the given handle.
1182 ///
1183 /// When the returned value is dropped, the event handler will be removed.
1184 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1185 EventHandlerDropGuard::new(handle, self.clone())
1186 }
1187
1188 /// Add an arbitrary value for use as event handler context.
1189 ///
1190 /// The value can be obtained in an event handler by adding an argument of
1191 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1192 ///
1193 /// If a value of the same type has been added before, it will be
1194 /// overwritten.
1195 ///
1196 /// # Examples
1197 ///
1198 /// ```no_run
1199 /// use matrix_sdk::{
1200 /// Room, event_handler::Ctx,
1201 /// ruma::events::room::message::SyncRoomMessageEvent,
1202 /// };
1203 /// # #[derive(Clone)]
1204 /// # struct SomeType;
1205 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1206 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1207 /// # futures_executor::block_on(async {
1208 /// # let client = matrix_sdk::Client::builder()
1209 /// # .homeserver_url(homeserver)
1210 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1211 /// # .build()
1212 /// # .await
1213 /// # .unwrap();
1214 ///
1215 /// // Handle used to send messages to the UI part of the app
1216 /// let my_gui_handle: SomeType = obtain_gui_handle();
1217 ///
1218 /// client.add_event_handler_context(my_gui_handle.clone());
1219 /// client.add_event_handler(
1220 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1221 /// async move {
1222 /// // gui_handle.send(DisplayMessage { message: ev });
1223 /// }
1224 /// },
1225 /// );
1226 /// # });
1227 /// ```
1228 pub fn add_event_handler_context<T>(&self, ctx: T)
1229 where
1230 T: Clone + Send + Sync + 'static,
1231 {
1232 self.inner.event_handlers.add_context(ctx);
1233 }
1234
1235 /// Register a handler for a notification.
1236 ///
1237 /// Similar to [`Client::add_event_handler`], but only allows functions
1238 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1239 /// [`Client`] for now.
1240 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1241 where
1242 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1243 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1244 {
1245 self.inner.notification_handlers.write().await.push(Box::new(
1246 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1247 ));
1248
1249 self
1250 }
1251
1252 /// Subscribe to all updates for the room with the given ID.
1253 ///
1254 /// The returned receiver will receive a new message for each sync response
1255 /// that contains updates for that room.
1256 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1257 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1258 btree_map::Entry::Vacant(entry) => {
1259 let (tx, rx) = broadcast::channel(8);
1260 entry.insert(tx);
1261 rx
1262 }
1263 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1264 }
1265 }
1266
1267 /// Subscribe to all updates to all rooms, whenever any has been received in
1268 /// a sync response.
1269 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1270 self.inner.room_updates_sender.subscribe()
1271 }
1272
1273 pub(crate) async fn notification_handlers(
1274 &self,
1275 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1276 self.inner.notification_handlers.read().await
1277 }
1278
1279 /// Get all the rooms the client knows about.
1280 ///
1281 /// This will return the list of joined, invited, and left rooms.
1282 pub fn rooms(&self) -> Vec<Room> {
1283 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1284 }
1285
1286 /// Get all the rooms the client knows about, filtered by room state.
1287 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1288 self.base_client()
1289 .rooms_filtered(filter)
1290 .into_iter()
1291 .map(|room| Room::new(self.clone(), room))
1292 .collect()
1293 }
1294
1295 /// Get a stream of all the rooms, in addition to the existing rooms.
1296 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1297 let (rooms, stream) = self.base_client().rooms_stream();
1298
1299 let map_room = |room| Room::new(self.clone(), room);
1300
1301 (
1302 rooms.into_iter().map(map_room).collect(),
1303 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1304 )
1305 }
1306
1307 /// Returns the joined rooms this client knows about.
1308 pub fn joined_rooms(&self) -> Vec<Room> {
1309 self.rooms_filtered(RoomStateFilter::JOINED)
1310 }
1311
1312 /// Returns the invited rooms this client knows about.
1313 pub fn invited_rooms(&self) -> Vec<Room> {
1314 self.rooms_filtered(RoomStateFilter::INVITED)
1315 }
1316
1317 /// Returns the left rooms this client knows about.
1318 pub fn left_rooms(&self) -> Vec<Room> {
1319 self.rooms_filtered(RoomStateFilter::LEFT)
1320 }
1321
1322 /// Returns the joined space rooms this client knows about.
1323 pub fn joined_space_rooms(&self) -> Vec<Room> {
1324 self.base_client()
1325 .rooms_filtered(RoomStateFilter::JOINED)
1326 .into_iter()
1327 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1328 .collect()
1329 }
1330
1331 /// Get a room with the given room id.
1332 ///
1333 /// # Arguments
1334 ///
1335 /// `room_id` - The unique id of the room that should be fetched.
1336 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1337 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1338 }
1339
1340 /// Gets the preview of a room, whether the current user has joined it or
1341 /// not.
1342 pub async fn get_room_preview(
1343 &self,
1344 room_or_alias_id: &RoomOrAliasId,
1345 via: Vec<OwnedServerName>,
1346 ) -> Result<RoomPreview> {
1347 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1348 Ok(room_id) => room_id.to_owned(),
1349 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1350 };
1351
1352 if let Some(room) = self.get_room(&room_id) {
1353 // The cached data can only be trusted if the room state is joined or
1354 // banned: for invite and knock rooms, no updates will be received
1355 // for the rooms after the invite/knock action took place so we may
1356 // have very out to date data for important fields such as
1357 // `join_rule`. For left rooms, the homeserver should return the latest info.
1358 match room.state() {
1359 RoomState::Joined | RoomState::Banned => {
1360 return Ok(RoomPreview::from_known_room(&room).await);
1361 }
1362 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1363 }
1364 }
1365
1366 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1367 }
1368
1369 /// Resolve a room alias to a room id and a list of servers which know
1370 /// about it.
1371 ///
1372 /// # Arguments
1373 ///
1374 /// `room_alias` - The room alias to be resolved.
1375 pub async fn resolve_room_alias(
1376 &self,
1377 room_alias: &RoomAliasId,
1378 ) -> HttpResult<get_alias::v3::Response> {
1379 let request = get_alias::v3::Request::new(room_alias.to_owned());
1380 self.send(request).await
1381 }
1382
1383 /// Checks if a room alias is not in use yet.
1384 ///
1385 /// Returns:
1386 /// - `Ok(true)` if the room alias is available.
1387 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1388 /// status code).
1389 /// - An `Err` otherwise.
1390 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1391 match self.resolve_room_alias(alias).await {
1392 // The room alias was resolved, so it's already in use.
1393 Ok(_) => Ok(false),
1394 Err(error) => {
1395 match error.client_api_error_kind() {
1396 // The room alias wasn't found, so it's available.
1397 Some(ErrorKind::NotFound) => Ok(true),
1398 _ => Err(error),
1399 }
1400 }
1401 }
1402 }
1403
1404 /// Adds a new room alias associated with a room to the room directory.
1405 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1406 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1407 self.send(request).await?;
1408 Ok(())
1409 }
1410
1411 /// Removes a room alias from the room directory.
1412 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1413 let request = delete_alias::v3::Request::new(alias.to_owned());
1414 self.send(request).await?;
1415 Ok(())
1416 }
1417
1418 /// Update the homeserver from the login response well-known if needed.
1419 ///
1420 /// # Arguments
1421 ///
1422 /// * `login_well_known` - The `well_known` field from a successful login
1423 /// response.
1424 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1425 if self.inner.respect_login_well_known
1426 && let Some(well_known) = login_well_known
1427 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1428 {
1429 self.set_homeserver(homeserver);
1430 }
1431 }
1432
1433 /// Similar to [`Client::restore_session_with`], with
1434 /// [`RoomLoadSettings::default()`].
1435 ///
1436 /// # Panics
1437 ///
1438 /// Panics if a session was already restored or logged in.
1439 #[instrument(skip_all)]
1440 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1441 self.restore_session_with(session, RoomLoadSettings::default()).await
1442 }
1443
1444 /// Restore a session previously logged-in using one of the available
1445 /// authentication APIs. The number of rooms to restore is controlled by
1446 /// [`RoomLoadSettings`].
1447 ///
1448 /// See the documentation of the corresponding authentication API's
1449 /// `restore_session` method for more information.
1450 ///
1451 /// # Panics
1452 ///
1453 /// Panics if a session was already restored or logged in.
1454 #[instrument(skip_all)]
1455 pub async fn restore_session_with(
1456 &self,
1457 session: impl Into<AuthSession>,
1458 room_load_settings: RoomLoadSettings,
1459 ) -> Result<()> {
1460 let session = session.into();
1461 match session {
1462 AuthSession::Matrix(session) => {
1463 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1464 }
1465 AuthSession::OAuth(session) => {
1466 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1467 }
1468 }
1469 }
1470
1471 /// Refresh the access token using the authentication API used to log into
1472 /// this session.
1473 ///
1474 /// See the documentation of the authentication API's `refresh_access_token`
1475 /// method for more information.
1476 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1477 let Some(auth_api) = self.auth_api() else {
1478 return Err(RefreshTokenError::RefreshTokenRequired);
1479 };
1480
1481 match auth_api {
1482 AuthApi::Matrix(api) => {
1483 trace!("Token refresh: Using the homeserver.");
1484 Box::pin(api.refresh_access_token()).await?;
1485 }
1486 AuthApi::OAuth(api) => {
1487 trace!("Token refresh: Using OAuth 2.0.");
1488 Box::pin(api.refresh_access_token()).await?;
1489 }
1490 }
1491
1492 Ok(())
1493 }
1494
1495 /// Log out the current session using the proper authentication API.
1496 ///
1497 /// # Errors
1498 ///
1499 /// Returns an error if the session is not authenticated or if an error
1500 /// occurred while making the request to the server.
1501 pub async fn logout(&self) -> Result<(), Error> {
1502 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1503 match auth_api {
1504 AuthApi::Matrix(matrix_auth) => {
1505 matrix_auth.logout().await?;
1506 Ok(())
1507 }
1508 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1509 }
1510 }
1511
1512 /// Get or upload a sync filter.
1513 ///
1514 /// This method will either get a filter ID from the store or upload the
1515 /// filter definition to the homeserver and return the new filter ID.
1516 ///
1517 /// # Arguments
1518 ///
1519 /// * `filter_name` - The unique name of the filter, this name will be used
1520 /// locally to store and identify the filter ID returned by the server.
1521 ///
1522 /// * `definition` - The filter definition that should be uploaded to the
1523 /// server if no filter ID can be found in the store.
1524 ///
1525 /// # Examples
1526 ///
1527 /// ```no_run
1528 /// # use matrix_sdk::{
1529 /// # Client, config::SyncSettings,
1530 /// # ruma::api::client::{
1531 /// # filter::{
1532 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1533 /// # },
1534 /// # sync::sync_events::v3::Filter,
1535 /// # }
1536 /// # };
1537 /// # use url::Url;
1538 /// # async {
1539 /// # let homeserver = Url::parse("http://example.com").unwrap();
1540 /// # let client = Client::new(homeserver).await.unwrap();
1541 /// let mut filter = FilterDefinition::default();
1542 ///
1543 /// // Let's enable member lazy loading.
1544 /// filter.room.state.lazy_load_options =
1545 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1546 ///
1547 /// let filter_id = client
1548 /// .get_or_upload_filter("sync", filter)
1549 /// .await
1550 /// .unwrap();
1551 ///
1552 /// let sync_settings = SyncSettings::new()
1553 /// .filter(Filter::FilterId(filter_id));
1554 ///
1555 /// let response = client.sync_once(sync_settings).await.unwrap();
1556 /// # };
1557 #[instrument(skip(self, definition))]
1558 pub async fn get_or_upload_filter(
1559 &self,
1560 filter_name: &str,
1561 definition: FilterDefinition,
1562 ) -> Result<String> {
1563 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1564 debug!("Found filter locally");
1565 Ok(filter)
1566 } else {
1567 debug!("Didn't find filter locally");
1568 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1569 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1570 let response = self.send(request).await?;
1571
1572 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1573
1574 Ok(response.filter_id)
1575 }
1576 }
1577
1578 /// Prepare to join a room by ID, by getting the current details about it
1579 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1580 let room = self.get_room(room_id)?;
1581
1582 let inviter = match room.invite_details().await {
1583 Ok(details) => details.inviter,
1584 Err(Error::WrongRoomState(_)) => None,
1585 Err(e) => {
1586 warn!("Error fetching invite details for room: {e:?}");
1587 None
1588 }
1589 };
1590
1591 Some(PreJoinRoomInfo { inviter })
1592 }
1593
1594 /// Finish joining a room.
1595 ///
1596 /// If the room was an invite that should be marked as a DM, will include it
1597 /// in the DM event after creating the joined room.
1598 ///
1599 /// If encrypted history sharing is enabled, will check to see if we have a
1600 /// key bundle, and import it if so.
1601 ///
1602 /// # Arguments
1603 ///
1604 /// * `room_id` - The `RoomId` of the room that was joined.
1605 /// * `pre_join_room_info` - Information about the room before we joined.
1606 async fn finish_join_room(
1607 &self,
1608 room_id: &RoomId,
1609 pre_join_room_info: Option<PreJoinRoomInfo>,
1610 ) -> Result<Room> {
1611 info!(?room_id, ?pre_join_room_info, "Completing room join");
1612 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1613 room.state() == RoomState::Invited
1614 && room.is_direct().await.unwrap_or_else(|e| {
1615 warn!(%room_id, "is_direct() failed: {e}");
1616 false
1617 })
1618 } else {
1619 false
1620 };
1621
1622 let base_room = self
1623 .base_client()
1624 .room_joined(
1625 room_id,
1626 pre_join_room_info
1627 .as_ref()
1628 .and_then(|info| info.inviter.as_ref())
1629 .map(|i| i.user_id().to_owned()),
1630 )
1631 .await?;
1632 let room = Room::new(self.clone(), base_room);
1633
1634 if mark_as_dm {
1635 room.set_is_direct(true).await?;
1636 }
1637
1638 // If we joined following an invite, check if we had previously received a key
1639 // bundle from the inviter, and import it if so.
1640 //
1641 // It's important that we only do this once `BaseClient::room_joined` has
1642 // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1643 // race.
1644 #[cfg(feature = "e2e-encryption")]
1645 if self.inner.enable_share_history_on_invite
1646 && let Some(inviter) =
1647 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1648 {
1649 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1650 .await?;
1651 }
1652
1653 // Suppress "unused variable" and "unused field" lints
1654 #[cfg(not(feature = "e2e-encryption"))]
1655 let _ = pre_join_room_info.map(|i| i.inviter);
1656
1657 Ok(room)
1658 }
1659
1660 /// Join a room by `RoomId`.
1661 ///
1662 /// Returns the `Room` in the joined state.
1663 ///
1664 /// # Arguments
1665 ///
1666 /// * `room_id` - The `RoomId` of the room to be joined.
1667 #[instrument(skip(self))]
1668 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1669 // See who invited us to this room, if anyone. Note we have to do this before
1670 // making the `/join` request, otherwise we could race against the sync.
1671 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1672
1673 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1674 let response = self.send(request).await?;
1675 self.finish_join_room(&response.room_id, pre_join_info).await
1676 }
1677
1678 /// Join a room by `RoomOrAliasId`.
1679 ///
1680 /// Returns the `Room` in the joined state.
1681 ///
1682 /// # Arguments
1683 ///
1684 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1685 /// alias looks like `#name:example.com`.
1686 /// * `server_names` - The server names to be used for resolving the alias,
1687 /// if needs be.
1688 #[instrument(skip(self))]
1689 pub async fn join_room_by_id_or_alias(
1690 &self,
1691 alias: &RoomOrAliasId,
1692 server_names: &[OwnedServerName],
1693 ) -> Result<Room> {
1694 let pre_join_info = {
1695 match alias.try_into() {
1696 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1697 Err(_) => {
1698 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1699 // responding to an invitation to the room, and therefore don't need to handle
1700 // things that happen as a result of invites.
1701 None
1702 }
1703 }
1704 };
1705 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1706 via: server_names.to_owned(),
1707 });
1708 let response = self.send(request).await?;
1709 self.finish_join_room(&response.room_id, pre_join_info).await
1710 }
1711
1712 /// Search the homeserver's directory of public rooms.
1713 ///
1714 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1715 /// a `get_public_rooms::Response`.
1716 ///
1717 /// # Arguments
1718 ///
1719 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1720 ///
1721 /// * `since` - Pagination token from a previous request.
1722 ///
1723 /// * `server` - The name of the server, if `None` the requested server is
1724 /// used.
1725 ///
1726 /// # Examples
1727 /// ```no_run
1728 /// use matrix_sdk::Client;
1729 /// # use url::Url;
1730 /// # let homeserver = Url::parse("http://example.com").unwrap();
1731 /// # let limit = Some(10);
1732 /// # let since = Some("since token");
1733 /// # let server = Some("servername.com".try_into().unwrap());
1734 /// # async {
1735 /// let mut client = Client::new(homeserver).await.unwrap();
1736 ///
1737 /// client.public_rooms(limit, since, server).await;
1738 /// # };
1739 /// ```
1740 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1741 pub async fn public_rooms(
1742 &self,
1743 limit: Option<u32>,
1744 since: Option<&str>,
1745 server: Option<&ServerName>,
1746 ) -> HttpResult<get_public_rooms::v3::Response> {
1747 let limit = limit.map(UInt::from);
1748
1749 let request = assign!(get_public_rooms::v3::Request::new(), {
1750 limit,
1751 since: since.map(ToOwned::to_owned),
1752 server: server.map(ToOwned::to_owned),
1753 });
1754 self.send(request).await
1755 }
1756
1757 /// Create a room with the given parameters.
1758 ///
1759 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1760 /// created room.
1761 ///
1762 /// If you want to create a direct message with one specific user, you can
1763 /// use [`create_dm`][Self::create_dm], which is more convenient than
1764 /// assembling the [`create_room::v3::Request`] yourself.
1765 ///
1766 /// If the `is_direct` field of the request is set to `true` and at least
1767 /// one user is invited, the room will be automatically added to the direct
1768 /// rooms in the account data.
1769 ///
1770 /// # Examples
1771 ///
1772 /// ```no_run
1773 /// use matrix_sdk::{
1774 /// Client,
1775 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1776 /// };
1777 /// # use url::Url;
1778 /// #
1779 /// # async {
1780 /// # let homeserver = Url::parse("http://example.com").unwrap();
1781 /// let request = CreateRoomRequest::new();
1782 /// let client = Client::new(homeserver).await.unwrap();
1783 /// assert!(client.create_room(request).await.is_ok());
1784 /// # };
1785 /// ```
1786 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1787 let invite = request.invite.clone();
1788 let is_direct_room = request.is_direct;
1789 let response = self.send(request).await?;
1790
1791 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1792
1793 let joined_room = Room::new(self.clone(), base_room);
1794
1795 if is_direct_room
1796 && !invite.is_empty()
1797 && let Err(error) =
1798 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1799 {
1800 // FIXME: Retry in the background
1801 error!("Failed to mark room as DM: {error}");
1802 }
1803
1804 Ok(joined_room)
1805 }
1806
1807 /// Create a DM room.
1808 ///
1809 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1810 /// given user being invited, the room marked `is_direct` and both the
1811 /// creator and invitee getting the default maximum power level.
1812 ///
1813 /// If the `e2e-encryption` feature is enabled, the room will also be
1814 /// encrypted.
1815 ///
1816 /// # Arguments
1817 ///
1818 /// * `user_id` - The ID of the user to create a DM for.
1819 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1820 #[cfg(feature = "e2e-encryption")]
1821 let initial_state = vec![
1822 InitialStateEvent::with_empty_state_key(
1823 RoomEncryptionEventContent::with_recommended_defaults(),
1824 )
1825 .to_raw_any(),
1826 ];
1827
1828 #[cfg(not(feature = "e2e-encryption"))]
1829 let initial_state = vec![];
1830
1831 let request = assign!(create_room::v3::Request::new(), {
1832 invite: vec![user_id.to_owned()],
1833 is_direct: true,
1834 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1835 initial_state,
1836 });
1837
1838 self.create_room(request).await
1839 }
1840
1841 /// Get the first existing DM room with the given user, if any.
1842 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1843 self.get_dm_rooms(user_id).next()
1844 }
1845
1846 /// Get an iterator with the existing DM rooms for the given user.
1847 pub fn get_dm_rooms(&self, user_id: &UserId) -> impl Iterator<Item = Room> {
1848 let rooms = self.joined_rooms();
1849
1850 let dm_definition = &self.base_client().dm_room_definition;
1851
1852 // Find the room we share with the `user_id` and only with `user_id`
1853 let rooms = rooms.into_iter().filter(move |r| {
1854 let targets = r.direct_targets();
1855 let targets_match =
1856 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id));
1857 match dm_definition {
1858 DmRoomDefinition::MatrixSpec => targets_match,
1859 DmRoomDefinition::TwoMembers => {
1860 let service_members_count =
1861 r.service_members().map(|s| s.len()).unwrap_or_default() as u64;
1862 let active_non_service_members =
1863 r.active_members_count().saturating_sub(service_members_count);
1864 targets_match && active_non_service_members <= 2
1865 }
1866 }
1867 });
1868
1869 trace!(?user_id, ?rooms, "Found DM rooms with user");
1870 rooms
1871 }
1872
1873 /// Search the homeserver's directory for public rooms with a filter.
1874 ///
1875 /// # Arguments
1876 ///
1877 /// * `room_search` - The easiest way to create this request is using the
1878 /// `get_public_rooms_filtered::Request` itself.
1879 ///
1880 /// # Examples
1881 ///
1882 /// ```no_run
1883 /// # use url::Url;
1884 /// # use matrix_sdk::Client;
1885 /// # async {
1886 /// # let homeserver = Url::parse("http://example.com")?;
1887 /// use matrix_sdk::ruma::{
1888 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1889 /// };
1890 /// # let mut client = Client::new(homeserver).await?;
1891 ///
1892 /// let mut filter = Filter::new();
1893 /// filter.generic_search_term = Some("rust".to_owned());
1894 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1895 /// request.filter = filter;
1896 ///
1897 /// let response = client.public_rooms_filtered(request).await?;
1898 ///
1899 /// for room in response.chunk {
1900 /// println!("Found room {room:?}");
1901 /// }
1902 /// # anyhow::Ok(()) };
1903 /// ```
1904 pub async fn public_rooms_filtered(
1905 &self,
1906 request: get_public_rooms_filtered::v3::Request,
1907 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1908 self.send(request).await
1909 }
1910
1911 /// Send an arbitrary request to the server, without updating client state.
1912 ///
1913 /// **Warning:** Because this method *does not* update the client state, it
1914 /// is important to make sure that you account for this yourself, and
1915 /// use wrapper methods where available. This method should *only* be
1916 /// used if a wrapper method for the endpoint you'd like to use is not
1917 /// available.
1918 ///
1919 /// # Arguments
1920 ///
1921 /// * `request` - A filled out and valid request for the endpoint to be hit
1922 ///
1923 /// * `timeout` - An optional request timeout setting, this overrides the
1924 /// default request setting if one was set.
1925 ///
1926 /// # Examples
1927 ///
1928 /// ```no_run
1929 /// # use matrix_sdk::{Client, config::SyncSettings};
1930 /// # use url::Url;
1931 /// # async {
1932 /// # let homeserver = Url::parse("http://localhost:8080")?;
1933 /// # let mut client = Client::new(homeserver).await?;
1934 /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1935 ///
1936 /// // First construct the request you want to make
1937 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1938 /// // for all available Endpoints
1939 /// let user_id = owned_user_id!("@example:localhost");
1940 /// let request = profile::get_profile::v3::Request::new(user_id);
1941 ///
1942 /// // Start the request using Client::send()
1943 /// let response = client.send(request).await?;
1944 ///
1945 /// // Check the corresponding Response struct to find out what types are
1946 /// // returned
1947 /// # anyhow::Ok(()) };
1948 /// ```
1949 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1950 where
1951 Request: OutgoingRequest + Clone + Debug,
1952 Request::Authentication: SupportedAuthScheme,
1953 Request::PathBuilder: SupportedPathBuilder,
1954 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1955 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1956 {
1957 SendRequest {
1958 client: self.clone(),
1959 request,
1960 config: None,
1961 send_progress: Default::default(),
1962 }
1963 }
1964
1965 pub(crate) async fn send_inner<Request>(
1966 &self,
1967 request: Request,
1968 config: Option<RequestConfig>,
1969 send_progress: SharedObservable<TransmissionProgress>,
1970 ) -> HttpResult<Request::IncomingResponse>
1971 where
1972 Request: OutgoingRequest + Debug,
1973 Request::Authentication: SupportedAuthScheme,
1974 Request::PathBuilder: SupportedPathBuilder,
1975 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1976 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1977 {
1978 let homeserver = self.homeserver().to_string();
1979 let access_token = self.access_token();
1980 let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1981
1982 let path_builder_input =
1983 Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1984
1985 let result = self
1986 .inner
1987 .http_client
1988 .send(
1989 request,
1990 config,
1991 homeserver,
1992 access_token.as_deref(),
1993 path_builder_input,
1994 send_progress,
1995 )
1996 .await;
1997
1998 if let Err(Some(ErrorKind::UnknownToken { .. })) =
1999 result.as_ref().map_err(HttpError::client_api_error_kind)
2000 && let Some(access_token) = &access_token
2001 {
2002 // Mark the access token as expired.
2003 self.auth_ctx().set_access_token_expired(access_token);
2004 }
2005
2006 result
2007 }
2008
2009 fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
2010 _ = self
2011 .inner
2012 .auth_ctx
2013 .session_change_sender
2014 .send(SessionChange::UnknownToken(unknown_token_data.clone()));
2015 }
2016
2017 /// Fetches server versions from network; no caching.
2018 pub async fn fetch_server_versions(
2019 &self,
2020 request_config: Option<RequestConfig>,
2021 ) -> HttpResult<get_supported_versions::Response> {
2022 // Since this was called by the user, try to refresh the access token if
2023 // necessary.
2024 self.fetch_server_versions_inner(false, request_config).await
2025 }
2026
2027 /// Fetches server versions from network; no caching.
2028 ///
2029 /// If the access token is expired and `failsafe` is `false`, this will
2030 /// attempt to refresh the access token, otherwise this will try to make an
2031 /// unauthenticated request instead.
2032 pub(crate) async fn fetch_server_versions_inner(
2033 &self,
2034 failsafe: bool,
2035 request_config: Option<RequestConfig>,
2036 ) -> HttpResult<get_supported_versions::Response> {
2037 if !failsafe {
2038 // `Client::send()` handles refreshing access tokens.
2039 return self
2040 .send(get_supported_versions::Request::new())
2041 .with_request_config(request_config)
2042 .await;
2043 }
2044
2045 let homeserver = self.homeserver().to_string();
2046
2047 // If we have a fresh access token, try with it first.
2048 if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2049 && self.auth_ctx().has_valid_access_token()
2050 && let Some(access_token) = self.access_token()
2051 {
2052 let result = self
2053 .inner
2054 .http_client
2055 .send(
2056 get_supported_versions::Request::new(),
2057 request_config,
2058 homeserver.clone(),
2059 Some(&access_token),
2060 (),
2061 Default::default(),
2062 )
2063 .await;
2064
2065 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2066 result.as_ref().map_err(HttpError::client_api_error_kind)
2067 {
2068 // If the access token is actually expired, mark it as expired and fallback to
2069 // the unauthenticated request below.
2070 self.auth_ctx().set_access_token_expired(&access_token);
2071 } else {
2072 // If the request succeeded or it's an other error, just stop now.
2073 return result;
2074 }
2075 }
2076
2077 // Try without authentication.
2078 self.inner
2079 .http_client
2080 .send(
2081 get_supported_versions::Request::new(),
2082 request_config,
2083 homeserver.clone(),
2084 None,
2085 (),
2086 Default::default(),
2087 )
2088 .await
2089 }
2090
2091 /// Fetches client well_known from network; no caching.
2092 ///
2093 /// 1. If the [`Client::server`] value is available, we use it to fetch the
2094 /// well-known contents.
2095 /// 2. If it's not, we try extracting the server name from the
2096 /// [`Client::user_id`] and building the server URL from it.
2097 /// 3. If we couldn't get the well-known contents with either the explicit
2098 /// server name or the implicit extracted one, we try the homeserver URL
2099 /// as a last resort.
2100 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2101 let homeserver = self.homeserver();
2102 let scheme = homeserver.scheme();
2103
2104 // Use the server name, either an explicit one or an implicit one taken from
2105 // the user id: sometimes we'll have only the homeserver url available and no
2106 // server name, but the server name can be extracted from the current user id.
2107 let server_url = self
2108 .server()
2109 .map(|server| server.to_string())
2110 // If the server name wasn't available, extract it from the user id and build a URL:
2111 // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2112 // will be the same for the public server url, lacking a better candidate.
2113 .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2114
2115 // If the server name is available, first try using it
2116 let response = if let Some(server_url) = server_url {
2117 // First try using the server name
2118 self.fetch_client_well_known_with_url(server_url).await
2119 } else {
2120 None
2121 };
2122
2123 // If we didn't get a well-known value yet, try with the homeserver url instead:
2124 if response.is_none() {
2125 // Sometimes people configure their well-known directly on the homeserver so use
2126 // this as a fallback when the server name is unknown.
2127 warn!(
2128 "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2129 );
2130 self.fetch_client_well_known_with_url(homeserver.to_string()).await
2131 } else {
2132 response
2133 }
2134 }
2135
2136 async fn fetch_client_well_known_with_url(
2137 &self,
2138 url: String,
2139 ) -> Option<discover_homeserver::Response> {
2140 let well_known = self
2141 .inner
2142 .http_client
2143 .send(
2144 discover_homeserver::Request::new(),
2145 Some(RequestConfig::short_retry()),
2146 url,
2147 None,
2148 (),
2149 Default::default(),
2150 )
2151 .await;
2152
2153 match well_known {
2154 Ok(well_known) => Some(well_known),
2155 Err(http_error) => {
2156 // It is perfectly valid to not have a well-known file.
2157 // Maybe we should check for a specific error code to be sure?
2158 warn!("Failed to fetch client well-known: {http_error}");
2159 None
2160 }
2161 }
2162 }
2163
2164 /// Load supported versions from storage, or fetch them from network and
2165 /// cache them.
2166 ///
2167 /// If `failsafe` is true, this will try to minimize side effects to avoid
2168 /// possible deadlocks.
2169 async fn fetch_supported_versions(
2170 &self,
2171 failsafe: bool,
2172 ) -> HttpResult<SupportedVersionsResponse> {
2173 let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2174 let supported_versions = SupportedVersionsResponse {
2175 versions: server_versions.versions,
2176 unstable_features: server_versions.unstable_features,
2177 };
2178
2179 Ok(supported_versions)
2180 }
2181
2182 /// Get the Matrix versions and features supported by the homeserver by
2183 /// fetching them from the server or the cache.
2184 ///
2185 /// This is equivalent to calling both [`Client::server_versions()`] and
2186 /// [`Client::unstable_features()`]. To always fetch the result from the
2187 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2188 /// and then `.as_supported_versions()` on the response.
2189 ///
2190 /// # Examples
2191 ///
2192 /// ```no_run
2193 /// use ruma::api::{FeatureFlag, MatrixVersion};
2194 /// # use matrix_sdk::{Client, config::SyncSettings};
2195 /// # use url::Url;
2196 /// # async {
2197 /// # let homeserver = Url::parse("http://localhost:8080")?;
2198 /// # let mut client = Client::new(homeserver).await?;
2199 ///
2200 /// let supported = client.supported_versions().await?;
2201 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2202 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2203 ///
2204 /// let msc_x_feature = FeatureFlag::from("msc_x");
2205 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2206 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2207 /// # anyhow::Ok(()) };
2208 /// ```
2209 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2210 self.supported_versions_inner(false).await
2211 }
2212
2213 /// Get the Matrix versions and features supported by the homeserver by
2214 /// fetching them from the server or the cache.
2215 ///
2216 /// If `failsafe` is true, this will try to minimize side effects to avoid
2217 /// possible deadlocks.
2218 pub(crate) async fn supported_versions_inner(
2219 &self,
2220 failsafe: bool,
2221 ) -> HttpResult<SupportedVersions> {
2222 match self.supported_versions_cached_inner(failsafe).await {
2223 Ok(Some(value)) => {
2224 return Ok(value);
2225 }
2226 Ok(None) => {
2227 // The cache is empty, make a request.
2228 }
2229 Err(error) => {
2230 warn!("error when loading cached supported versions: {error}");
2231 // Fallthrough to make a request.
2232 }
2233 }
2234
2235 self.refresh_supported_versions_cache(failsafe).await
2236 }
2237
2238 /// Refresh the Matrix versions and features supported by the homeserver in
2239 /// the cache.
2240 ///
2241 /// If `failsafe` is true, this will try to minimize side effects to avoid
2242 /// possible deadlocks.
2243 async fn refresh_supported_versions_cache(
2244 &self,
2245 failsafe: bool,
2246 ) -> HttpResult<SupportedVersions> {
2247 let cached_supported_versions = &self.inner.caches.supported_versions;
2248
2249 let mut supported_versions_guard = match cached_supported_versions.refresh_lock.try_lock() {
2250 Ok(guard) => guard,
2251 Err(_) => {
2252 // There is already a refresh in progress, wait for it to finish.
2253 let guard = cached_supported_versions.refresh_lock.lock().await;
2254
2255 if let Err(error) = guard.as_ref() {
2256 // There was an error in the previous refresh, return it.
2257 return Err(HttpError::Cached(error.clone()));
2258 }
2259
2260 // Reuse the data if it was cached and it hasn't expired.
2261 if let CachedValue::Cached(value) = cached_supported_versions.value()
2262 && !value.has_expired()
2263 {
2264 return Ok(value.into_data());
2265 }
2266
2267 // The data wasn't cached or has expired, we need to make another request.
2268 guard
2269 }
2270 };
2271
2272 let response = match self.fetch_supported_versions(failsafe).await {
2273 Ok(response) => {
2274 *supported_versions_guard = Ok(());
2275 TtlValue::new(response)
2276 }
2277 Err(error) => {
2278 let error = Arc::new(error);
2279 *supported_versions_guard = Err(error.clone());
2280 return Err(HttpError::Cached(error));
2281 }
2282 };
2283
2284 let supported_versions = response.as_ref().map(|response| response.supported_versions());
2285
2286 // Only cache the result if the request was authenticated.
2287 if self.auth_ctx().has_valid_access_token() {
2288 if let Err(err) = self
2289 .state_store()
2290 .set_kv_data(
2291 StateStoreDataKey::SupportedVersions,
2292 StateStoreDataValue::SupportedVersions(response),
2293 )
2294 .await
2295 {
2296 warn!("error when caching supported versions: {err}");
2297 }
2298
2299 cached_supported_versions.set_value(supported_versions.clone());
2300 }
2301
2302 Ok(supported_versions.into_data())
2303 }
2304
2305 /// Get the Matrix versions and features supported by the homeserver by
2306 /// fetching them from the cache.
2307 ///
2308 /// For a version of this function that fetches the supported versions and
2309 /// features from the homeserver if the [`SupportedVersions`] aren't
2310 /// found in the cache, take a look at the [`Client::supported_versions()`]
2311 /// method.
2312 ///
2313 /// If the data in the cache has expired, this will trigger a background
2314 /// task to refresh it.
2315 ///
2316 /// # Examples
2317 ///
2318 /// ```no_run
2319 /// use ruma::api::{FeatureFlag, MatrixVersion};
2320 /// # use matrix_sdk::{Client, config::SyncSettings};
2321 /// # use url::Url;
2322 /// # async {
2323 /// # let homeserver = Url::parse("http://localhost:8080")?;
2324 /// # let mut client = Client::new(homeserver).await?;
2325 ///
2326 /// let supported =
2327 /// if let Some(supported) = client.supported_versions_cached().await? {
2328 /// supported
2329 /// } else {
2330 /// client.fetch_server_versions(None).await?.as_supported_versions()
2331 /// };
2332 ///
2333 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2334 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2335 ///
2336 /// let msc_x_feature = FeatureFlag::from("msc_x");
2337 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2338 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2339 /// # anyhow::Ok(()) };
2340 /// ```
2341 pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2342 self.supported_versions_cached_inner(false).await
2343 }
2344
2345 async fn supported_versions_cached_inner(
2346 &self,
2347 failsafe: bool,
2348 ) -> Result<Option<SupportedVersions>, StoreError> {
2349 let supported_versions_cache = &self.inner.caches.supported_versions;
2350
2351 let value = if let CachedValue::Cached(cached) = supported_versions_cache.value() {
2352 cached
2353 } else if let Some(stored) = self
2354 .state_store()
2355 .get_kv_data(StateStoreDataKey::SupportedVersions)
2356 .await?
2357 .and_then(|value| value.into_supported_versions())
2358 {
2359 let stored = stored.map(|response| response.supported_versions());
2360
2361 // Copy the data from the store in the in-memory cache.
2362 supported_versions_cache.set_value(stored.clone());
2363
2364 stored
2365 } else {
2366 return Ok(None);
2367 };
2368
2369 // Spawn a task to refresh the cache if it has expired and we have a valid
2370 // access token.
2371 if value.has_expired() && self.auth_ctx().has_valid_access_token() {
2372 debug!("spawning task to refresh supported versions cache");
2373
2374 let client = self.clone();
2375 self.task_monitor().spawn_finite_task("refresh supported versions cache", async move {
2376 if let Err(error) = client.refresh_supported_versions_cache(failsafe).await {
2377 warn!("failed to refresh supported versions cache: {error}");
2378 }
2379 });
2380 }
2381
2382 Ok(Some(value.into_data()))
2383 }
2384
2385 /// Get the Matrix versions supported by the homeserver by fetching them
2386 /// from the server or the cache.
2387 ///
2388 /// # Examples
2389 ///
2390 /// ```no_run
2391 /// use ruma::api::MatrixVersion;
2392 /// # use matrix_sdk::{Client, config::SyncSettings};
2393 /// # use url::Url;
2394 /// # async {
2395 /// # let homeserver = Url::parse("http://localhost:8080")?;
2396 /// # let mut client = Client::new(homeserver).await?;
2397 ///
2398 /// let server_versions = client.server_versions().await?;
2399 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2400 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2401 /// # anyhow::Ok(()) };
2402 /// ```
2403 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2404 Ok(self.supported_versions().await?.versions)
2405 }
2406
2407 /// Get the unstable features supported by the homeserver by fetching them
2408 /// from the server or the cache.
2409 ///
2410 /// # Examples
2411 ///
2412 /// ```no_run
2413 /// use matrix_sdk::ruma::api::FeatureFlag;
2414 /// # use matrix_sdk::{Client, config::SyncSettings};
2415 /// # use url::Url;
2416 /// # async {
2417 /// # let homeserver = Url::parse("http://localhost:8080")?;
2418 /// # let mut client = Client::new(homeserver).await?;
2419 ///
2420 /// let msc_x_feature = FeatureFlag::from("msc_x");
2421 /// let unstable_features = client.unstable_features().await?;
2422 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2423 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2424 /// # anyhow::Ok(()) };
2425 /// ```
2426 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2427 Ok(self.supported_versions().await?.features)
2428 }
2429
2430 /// Empty the supported versions and unstable features cache.
2431 ///
2432 /// Since the SDK caches the supported versions, it's possible to have a
2433 /// stale entry in the cache. This functions makes it possible to force
2434 /// reset it.
2435 pub async fn reset_supported_versions(&self) -> Result<()> {
2436 // Empty the in-memory cache.
2437 self.inner.caches.supported_versions.reset();
2438
2439 // Empty the store cache.
2440 Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2441 }
2442
2443 /// Get the well-known file of the homeserver from the cache.
2444 ///
2445 /// If the data in the cache has expired, this will trigger a background
2446 /// task to refresh it.
2447 async fn well_known_cached(
2448 &self,
2449 ) -> Result<CachedValue<Option<WellKnownResponse>>, StoreError> {
2450 let well_known_cache = &self.inner.caches.well_known;
2451
2452 let value = if let CachedValue::Cached(cached) = well_known_cache.value() {
2453 cached
2454 } else if let Some(stored) = self
2455 .state_store()
2456 .get_kv_data(StateStoreDataKey::WellKnown)
2457 .await?
2458 .and_then(|value| value.into_well_known())
2459 {
2460 // Copy the data from the store into the in-memory cache.
2461 well_known_cache.set_value(stored.clone());
2462
2463 stored
2464 } else {
2465 return Ok(CachedValue::NotSet);
2466 };
2467
2468 // Spawn a task to refresh the cache if it has expired.
2469 if value.has_expired() {
2470 debug!("spawning task to refresh well-known cache");
2471
2472 let client = self.clone();
2473 self.task_monitor().spawn_finite_task("refresh well-known cache", async move {
2474 client.refresh_well_known_cache().await;
2475 });
2476 }
2477
2478 Ok(CachedValue::Cached(value.into_data()))
2479 }
2480
2481 /// Refresh the well-known file of the homeserver in the cache.
2482 async fn refresh_well_known_cache(&self) -> Option<WellKnownResponse> {
2483 let well_known_cache = &self.inner.caches.well_known;
2484
2485 let _well_known_guard = match well_known_cache.refresh_lock.try_lock() {
2486 Ok(guard) => guard,
2487 Err(_) => {
2488 // There is already a refresh in progress, wait for it to finish.
2489 let guard = well_known_cache.refresh_lock.lock().await;
2490
2491 // A refresh can't fail because we ignore failures, so there shouldn't be an
2492 // error in the refresh lock.
2493
2494 // Reuse the data if it was cached and it hasn't expired.
2495 if let CachedValue::Cached(value) = well_known_cache.value()
2496 && !value.has_expired()
2497 {
2498 return value.into_data();
2499 }
2500
2501 // The data wasn't cached or has expired, we need to make another request.
2502 guard
2503 }
2504 };
2505
2506 let well_known = TtlValue::new(self.fetch_client_well_known().await.map(Into::into));
2507
2508 if let Err(err) = self
2509 .state_store()
2510 .set_kv_data(
2511 StateStoreDataKey::WellKnown,
2512 StateStoreDataValue::WellKnown(well_known.clone()),
2513 )
2514 .await
2515 {
2516 warn!("error when caching well-known: {err}");
2517 }
2518
2519 well_known_cache.set_value(well_known.clone());
2520
2521 well_known.into_data()
2522 }
2523
2524 /// Get the well-known file of the homeserver by fetching it from the server
2525 /// or the cache.
2526 async fn well_known(&self) -> Option<WellKnownResponse> {
2527 match self.well_known_cached().await {
2528 Ok(CachedValue::Cached(value)) => {
2529 return value;
2530 }
2531 Ok(CachedValue::NotSet) => {
2532 // The cache is empty, make a request.
2533 }
2534 Err(error) => {
2535 warn!("error when loading cached well-known: {error}");
2536 // Fallthrough to make a request.
2537 }
2538 }
2539
2540 self.refresh_well_known_cache().await
2541 }
2542
2543 /// Get information about the homeserver's advertised RTC foci by fetching
2544 /// the well-known file from the server or the cache.
2545 ///
2546 /// # Examples
2547 /// ```no_run
2548 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2549 /// # use url::Url;
2550 /// # async {
2551 /// # let homeserver = Url::parse("http://localhost:8080")?;
2552 /// # let mut client = Client::new(homeserver).await?;
2553 /// let rtc_foci = client.rtc_foci().await?;
2554 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2555 /// RtcFocusInfo::LiveKit(info) => Some(info),
2556 /// _ => None,
2557 /// });
2558 /// if let Some(info) = default_livekit_focus_info {
2559 /// println!("Default LiveKit service URL: {}", info.service_url);
2560 /// }
2561 /// # anyhow::Ok(()) };
2562 /// ```
2563 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2564 let well_known = self.well_known().await;
2565
2566 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2567 }
2568
2569 /// Empty the well-known cache.
2570 ///
2571 /// Since the SDK caches the well-known, it's possible to have a stale entry
2572 /// in the cache. This functions makes it possible to force reset it.
2573 pub async fn reset_well_known(&self) -> Result<()> {
2574 // Empty the in-memory caches.
2575 self.inner.caches.well_known.reset();
2576
2577 // Empty the store cache.
2578 Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2579 }
2580
2581 /// Check whether MSC 4028 is enabled on the homeserver.
2582 ///
2583 /// # Examples
2584 ///
2585 /// ```no_run
2586 /// # use matrix_sdk::{Client, config::SyncSettings};
2587 /// # use url::Url;
2588 /// # async {
2589 /// # let homeserver = Url::parse("http://localhost:8080")?;
2590 /// # let mut client = Client::new(homeserver).await?;
2591 /// let msc4028_enabled =
2592 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2593 /// # anyhow::Ok(()) };
2594 /// ```
2595 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2596 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2597 }
2598
2599 /// Get information of all our own devices.
2600 ///
2601 /// # Examples
2602 ///
2603 /// ```no_run
2604 /// # use matrix_sdk::{Client, config::SyncSettings};
2605 /// # use url::Url;
2606 /// # async {
2607 /// # let homeserver = Url::parse("http://localhost:8080")?;
2608 /// # let mut client = Client::new(homeserver).await?;
2609 /// let response = client.devices().await?;
2610 ///
2611 /// for device in response.devices {
2612 /// println!(
2613 /// "Device: {} {}",
2614 /// device.device_id,
2615 /// device.display_name.as_deref().unwrap_or("")
2616 /// );
2617 /// }
2618 /// # anyhow::Ok(()) };
2619 /// ```
2620 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2621 let request = get_devices::v3::Request::new();
2622
2623 self.send(request).await
2624 }
2625
2626 /// Delete the given devices from the server.
2627 ///
2628 /// # Arguments
2629 ///
2630 /// * `devices` - The list of devices that should be deleted from the
2631 /// server.
2632 ///
2633 /// * `auth_data` - This request requires user interactive auth, the first
2634 /// request needs to set this to `None` and will always fail with an
2635 /// `UiaaResponse`. The response will contain information for the
2636 /// interactive auth and the same request needs to be made but this time
2637 /// with some `auth_data` provided.
2638 ///
2639 /// ```no_run
2640 /// # use matrix_sdk::{
2641 /// # ruma::{api::client::uiaa, owned_device_id},
2642 /// # Client, Error, config::SyncSettings,
2643 /// # };
2644 /// # use serde_json::json;
2645 /// # use url::Url;
2646 /// # use std::collections::BTreeMap;
2647 /// # async {
2648 /// # let homeserver = Url::parse("http://localhost:8080")?;
2649 /// # let mut client = Client::new(homeserver).await?;
2650 /// let devices = &[owned_device_id!("DEVICEID")];
2651 ///
2652 /// if let Err(e) = client.delete_devices(devices, None).await {
2653 /// if let Some(info) = e.as_uiaa_response() {
2654 /// let mut password = uiaa::Password::new(
2655 /// uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2656 /// "wordpass".to_owned(),
2657 /// );
2658 /// password.session = info.session.clone();
2659 ///
2660 /// client
2661 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2662 /// .await?;
2663 /// }
2664 /// }
2665 /// # anyhow::Ok(()) };
2666 pub async fn delete_devices(
2667 &self,
2668 devices: &[OwnedDeviceId],
2669 auth_data: Option<uiaa::AuthData>,
2670 ) -> HttpResult<delete_devices::v3::Response> {
2671 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2672 request.auth = auth_data;
2673
2674 self.send(request).await
2675 }
2676
2677 /// Change the display name of a device owned by the current user.
2678 ///
2679 /// Returns a `update_device::Response` which specifies the result
2680 /// of the operation.
2681 ///
2682 /// # Arguments
2683 ///
2684 /// * `device_id` - The ID of the device to change the display name of.
2685 /// * `display_name` - The new display name to set.
2686 pub async fn rename_device(
2687 &self,
2688 device_id: &DeviceId,
2689 display_name: &str,
2690 ) -> HttpResult<update_device::v3::Response> {
2691 let mut request = update_device::v3::Request::new(device_id.to_owned());
2692 request.display_name = Some(display_name.to_owned());
2693
2694 self.send(request).await
2695 }
2696
2697 /// Check whether a device with a specific ID exists on the server.
2698 ///
2699 /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2700 /// with 404 and the underlying error otherwise.
2701 ///
2702 /// # Arguments
2703 ///
2704 /// * `device_id` - The ID of the device to query.
2705 pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2706 let request = device::get_device::v3::Request::new(device_id);
2707 match self.send(request).await {
2708 Ok(_) => Ok(true),
2709 Err(err) => {
2710 if let Some(error) = err.as_client_api_error()
2711 && error.status_code == 404
2712 {
2713 Ok(false)
2714 } else {
2715 Err(err.into())
2716 }
2717 }
2718 }
2719 }
2720
2721 /// Synchronize the client's state with the latest state on the server.
2722 ///
2723 /// ## Syncing Events
2724 ///
2725 /// Messages or any other type of event need to be periodically fetched from
2726 /// the server, this is achieved by sending a `/sync` request to the server.
2727 ///
2728 /// The first sync is sent out without a [`token`]. The response of the
2729 /// first sync will contain a [`next_batch`] field which should then be
2730 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2731 /// don't receive the same events multiple times.
2732 ///
2733 /// ## Long Polling
2734 ///
2735 /// A sync should in the usual case always be in flight. The
2736 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2737 /// long the server will wait for new events before it will respond.
2738 /// The server will respond immediately if some new events arrive before the
2739 /// timeout has expired. If no changes arrive and the timeout expires an
2740 /// empty sync response will be sent to the client.
2741 ///
2742 /// This method of sending a request that may not receive a response
2743 /// immediately is called long polling.
2744 ///
2745 /// ## Filtering Events
2746 ///
2747 /// The number or type of messages and events that the client should receive
2748 /// from the server can be altered using a [`Filter`].
2749 ///
2750 /// Filters can be non-trivial and, since they will be sent with every sync
2751 /// request, they may take up a bunch of unnecessary bandwidth.
2752 ///
2753 /// Luckily filters can be uploaded to the server and reused using an unique
2754 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2755 /// method.
2756 ///
2757 /// # Arguments
2758 ///
2759 /// * `sync_settings` - Settings for the sync call, this allows us to set
2760 /// various options to configure the sync:
2761 /// * [`filter`] - To configure which events we receive and which get
2762 /// [filtered] by the server
2763 /// * [`timeout`] - To configure our [long polling] setup.
2764 /// * [`token`] - To tell the server which events we already received
2765 /// and where we wish to continue syncing.
2766 /// * [`full_state`] - To tell the server that we wish to receive all
2767 /// state events, regardless of our configured [`token`].
2768 /// * [`set_presence`] - To tell the server to set the presence and to
2769 /// which state.
2770 ///
2771 /// # Examples
2772 ///
2773 /// ```no_run
2774 /// # use url::Url;
2775 /// # async {
2776 /// # let homeserver = Url::parse("http://localhost:8080")?;
2777 /// # let username = "";
2778 /// # let password = "";
2779 /// use matrix_sdk::{
2780 /// Client, config::SyncSettings,
2781 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2782 /// };
2783 ///
2784 /// let client = Client::new(homeserver).await?;
2785 /// client.matrix_auth().login_username(username, password).send().await?;
2786 ///
2787 /// // Sync once so we receive the client state and old messages.
2788 /// client.sync_once(SyncSettings::default()).await?;
2789 ///
2790 /// // Register our handler so we start responding once we receive a new
2791 /// // event.
2792 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2793 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2794 /// });
2795 ///
2796 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2797 /// // from our `sync_once()` call automatically.
2798 /// client.sync(SyncSettings::default()).await;
2799 /// # anyhow::Ok(()) };
2800 /// ```
2801 ///
2802 /// [`sync`]: #method.sync
2803 /// [`SyncSettings`]: crate::config::SyncSettings
2804 /// [`token`]: crate::config::SyncSettings#method.token
2805 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2806 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2807 /// [`set_presence`]: ruma::presence::PresenceState
2808 /// [`filter`]: crate::config::SyncSettings#method.filter
2809 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2810 /// [`next_batch`]: SyncResponse#structfield.next_batch
2811 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2812 /// [long polling]: #long-polling
2813 /// [filtered]: #filtering-events
2814 #[instrument(skip(self))]
2815 pub async fn sync_once(
2816 &self,
2817 sync_settings: crate::config::SyncSettings,
2818 ) -> Result<SyncResponse> {
2819 // The sync might not return for quite a while due to the timeout.
2820 // We'll see if there's anything crypto related to send out before we
2821 // sync, i.e. if we closed our client after a sync but before the
2822 // crypto requests were sent out.
2823 //
2824 // This will mostly be a no-op.
2825 #[cfg(feature = "e2e-encryption")]
2826 if let Err(e) = self.send_outgoing_requests().await {
2827 error!(error = ?e, "Error while sending outgoing E2EE requests");
2828 }
2829
2830 let token = match sync_settings.token {
2831 SyncToken::Specific(token) => Some(token),
2832 SyncToken::NoToken => None,
2833 SyncToken::ReusePrevious => self.sync_token().await,
2834 };
2835
2836 let request = assign!(sync_events::v3::Request::new(), {
2837 filter: sync_settings.filter.map(|f| *f),
2838 since: token,
2839 full_state: sync_settings.full_state,
2840 set_presence: sync_settings.set_presence,
2841 timeout: sync_settings.timeout,
2842 use_state_after: true,
2843 });
2844 let mut request_config = self.request_config();
2845 if let Some(timeout) = sync_settings.timeout {
2846 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2847 request_config.timeout = Some(base_timeout + timeout);
2848 }
2849
2850 let response = self.send(request).with_request_config(request_config).await?;
2851 let next_batch = response.next_batch.clone();
2852 let response = self.process_sync(response).await?;
2853
2854 #[cfg(feature = "e2e-encryption")]
2855 if let Err(e) = self.send_outgoing_requests().await {
2856 error!(error = ?e, "Error while sending outgoing E2EE requests");
2857 }
2858
2859 self.inner.sync_beat.notify(usize::MAX);
2860
2861 Ok(SyncResponse::new(next_batch, response))
2862 }
2863
2864 /// Repeatedly synchronize the client state with the server.
2865 ///
2866 /// This method will only return on error, if cancellation is needed
2867 /// the method should be wrapped in a cancelable task or the
2868 /// [`Client::sync_with_callback`] method can be used or
2869 /// [`Client::sync_with_result_callback`] if you want to handle error
2870 /// cases in the loop, too.
2871 ///
2872 /// This method will internally call [`Client::sync_once`] in a loop.
2873 ///
2874 /// This method can be used with the [`Client::add_event_handler`]
2875 /// method to react to individual events. If you instead wish to handle
2876 /// events in a bulk manner the [`Client::sync_with_callback`],
2877 /// [`Client::sync_with_result_callback`] and
2878 /// [`Client::sync_stream`] methods can be used instead. Those methods
2879 /// repeatedly return the whole sync response.
2880 ///
2881 /// # Arguments
2882 ///
2883 /// * `sync_settings` - Settings for the sync call. *Note* that those
2884 /// settings will be only used for the first sync call. See the argument
2885 /// docs for [`Client::sync_once`] for more info.
2886 ///
2887 /// # Return
2888 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2889 /// up to the user of the API to check the error and decide whether the sync
2890 /// should continue or not.
2891 ///
2892 /// # Examples
2893 ///
2894 /// ```no_run
2895 /// # use url::Url;
2896 /// # async {
2897 /// # let homeserver = Url::parse("http://localhost:8080")?;
2898 /// # let username = "";
2899 /// # let password = "";
2900 /// use matrix_sdk::{
2901 /// Client, config::SyncSettings,
2902 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2903 /// };
2904 ///
2905 /// let client = Client::new(homeserver).await?;
2906 /// client.matrix_auth().login_username(&username, &password).send().await?;
2907 ///
2908 /// // Register our handler so we start responding once we receive a new
2909 /// // event.
2910 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2911 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2912 /// });
2913 ///
2914 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2915 /// // automatically.
2916 /// client.sync(SyncSettings::default()).await?;
2917 /// # anyhow::Ok(()) };
2918 /// ```
2919 ///
2920 /// [argument docs]: #method.sync_once
2921 /// [`sync_with_callback`]: #method.sync_with_callback
2922 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2923 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2924 }
2925
2926 /// Repeatedly call sync to synchronize the client state with the server.
2927 ///
2928 /// # Arguments
2929 ///
2930 /// * `sync_settings` - Settings for the sync call. *Note* that those
2931 /// settings will be only used for the first sync call. See the argument
2932 /// docs for [`Client::sync_once`] for more info.
2933 ///
2934 /// * `callback` - A callback that will be called every time a successful
2935 /// response has been fetched from the server. The callback must return a
2936 /// boolean which signalizes if the method should stop syncing. If the
2937 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2938 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2939 ///
2940 /// # Return
2941 /// The sync runs until an error occurs or the
2942 /// callback indicates that the Loop should stop. If the callback asked for
2943 /// a regular stop, the result will be `Ok(())` otherwise the
2944 /// `Err(Error)` is returned.
2945 ///
2946 /// # Examples
2947 ///
2948 /// The following example demonstrates how to sync forever while sending all
2949 /// the interesting events through a mpsc channel to another thread e.g. a
2950 /// UI thread.
2951 ///
2952 /// ```no_run
2953 /// # use std::time::Duration;
2954 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2955 /// # use url::Url;
2956 /// # async {
2957 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2958 /// # let mut client = Client::new(homeserver).await.unwrap();
2959 ///
2960 /// use tokio::sync::mpsc::channel;
2961 ///
2962 /// let (tx, rx) = channel(100);
2963 ///
2964 /// let sync_channel = &tx;
2965 /// let sync_settings = SyncSettings::new()
2966 /// .timeout(Duration::from_secs(30));
2967 ///
2968 /// client
2969 /// .sync_with_callback(sync_settings, |response| async move {
2970 /// let channel = sync_channel;
2971 /// for (room_id, room) in response.rooms.joined {
2972 /// for event in room.timeline.events {
2973 /// channel.send(event).await.unwrap();
2974 /// }
2975 /// }
2976 ///
2977 /// LoopCtrl::Continue
2978 /// })
2979 /// .await;
2980 /// };
2981 /// ```
2982 #[instrument(skip_all)]
2983 pub async fn sync_with_callback<C>(
2984 &self,
2985 sync_settings: crate::config::SyncSettings,
2986 callback: impl Fn(SyncResponse) -> C,
2987 ) -> Result<(), Error>
2988 where
2989 C: Future<Output = LoopCtrl>,
2990 {
2991 self.sync_with_result_callback(sync_settings, |result| async {
2992 Ok(callback(result?).await)
2993 })
2994 .await
2995 }
2996
2997 /// Repeatedly call sync to synchronize the client state with the server.
2998 ///
2999 /// # Arguments
3000 ///
3001 /// * `sync_settings` - Settings for the sync call. *Note* that those
3002 /// settings will be only used for the first sync call. See the argument
3003 /// docs for [`Client::sync_once`] for more info.
3004 ///
3005 /// * `callback` - A callback that will be called every time after a
3006 /// response has been received, failure or not. The callback returns a
3007 /// `Result<LoopCtrl, Error>`, too. When returning
3008 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
3009 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
3010 /// function returns `Ok(())`. In case the callback can't handle the
3011 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
3012 /// which results in the sync ending and the `Err(Error)` being returned.
3013 ///
3014 /// # Return
3015 /// The sync runs until an error occurs that the callback can't handle or
3016 /// the callback indicates that the Loop should stop. If the callback
3017 /// asked for a regular stop, the result will be `Ok(())` otherwise the
3018 /// `Err(Error)` is returned.
3019 ///
3020 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
3021 /// this, and are handled first without sending the result to the
3022 /// callback. Only after they have exceeded is the `Result` handed to
3023 /// the callback.
3024 ///
3025 /// # Examples
3026 ///
3027 /// The following example demonstrates how to sync forever while sending all
3028 /// the interesting events through a mpsc channel to another thread e.g. a
3029 /// UI thread.
3030 ///
3031 /// ```no_run
3032 /// # use std::time::Duration;
3033 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
3034 /// # use url::Url;
3035 /// # async {
3036 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
3037 /// # let mut client = Client::new(homeserver).await.unwrap();
3038 /// #
3039 /// use tokio::sync::mpsc::channel;
3040 ///
3041 /// let (tx, rx) = channel(100);
3042 ///
3043 /// let sync_channel = &tx;
3044 /// let sync_settings = SyncSettings::new()
3045 /// .timeout(Duration::from_secs(30));
3046 ///
3047 /// client
3048 /// .sync_with_result_callback(sync_settings, |response| async move {
3049 /// let channel = sync_channel;
3050 /// let sync_response = response?;
3051 /// for (room_id, room) in sync_response.rooms.joined {
3052 /// for event in room.timeline.events {
3053 /// channel.send(event).await.unwrap();
3054 /// }
3055 /// }
3056 ///
3057 /// Ok(LoopCtrl::Continue)
3058 /// })
3059 /// .await;
3060 /// };
3061 /// ```
3062 #[instrument(skip(self, callback))]
3063 pub async fn sync_with_result_callback<C>(
3064 &self,
3065 sync_settings: crate::config::SyncSettings,
3066 callback: impl Fn(Result<SyncResponse, Error>) -> C,
3067 ) -> Result<(), Error>
3068 where
3069 C: Future<Output = Result<LoopCtrl, Error>>,
3070 {
3071 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
3072
3073 while let Some(result) = sync_stream.next().await {
3074 trace!("Running callback");
3075 if callback(result).await? == LoopCtrl::Break {
3076 trace!("Callback told us to stop");
3077 break;
3078 }
3079 trace!("Done running callback");
3080 }
3081
3082 Ok(())
3083 }
3084
3085 //// Repeatedly synchronize the client state with the server.
3086 ///
3087 /// This method will internally call [`Client::sync_once`] in a loop and is
3088 /// equivalent to the [`Client::sync`] method but the responses are provided
3089 /// as an async stream.
3090 ///
3091 /// # Arguments
3092 ///
3093 /// * `sync_settings` - Settings for the sync call. *Note* that those
3094 /// settings will be only used for the first sync call. See the argument
3095 /// docs for [`Client::sync_once`] for more info.
3096 ///
3097 /// # Examples
3098 ///
3099 /// ```no_run
3100 /// # use url::Url;
3101 /// # async {
3102 /// # let homeserver = Url::parse("http://localhost:8080")?;
3103 /// # let username = "";
3104 /// # let password = "";
3105 /// use futures_util::StreamExt;
3106 /// use matrix_sdk::{Client, config::SyncSettings};
3107 ///
3108 /// let client = Client::new(homeserver).await?;
3109 /// client.matrix_auth().login_username(&username, &password).send().await?;
3110 ///
3111 /// let mut sync_stream =
3112 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
3113 ///
3114 /// while let Some(Ok(response)) = sync_stream.next().await {
3115 /// for room in response.rooms.joined.values() {
3116 /// for e in &room.timeline.events {
3117 /// if let Ok(event) = e.raw().deserialize() {
3118 /// println!("Received event {:?}", event);
3119 /// }
3120 /// }
3121 /// }
3122 /// }
3123 ///
3124 /// # anyhow::Ok(()) };
3125 /// ```
3126 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3127 #[instrument(skip(self))]
3128 pub async fn sync_stream(
3129 &self,
3130 mut sync_settings: crate::config::SyncSettings,
3131 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3132 let mut is_first_sync = true;
3133 let mut timeout = None;
3134 let mut last_sync_time: Option<Instant> = None;
3135
3136 let parent_span = Span::current();
3137
3138 async_stream::stream!({
3139 loop {
3140 trace!("Syncing");
3141
3142 if sync_settings.ignore_timeout_on_first_sync {
3143 if is_first_sync {
3144 timeout = sync_settings.timeout.take();
3145 } else if sync_settings.timeout.is_none() && timeout.is_some() {
3146 sync_settings.timeout = timeout.take();
3147 }
3148
3149 is_first_sync = false;
3150 }
3151
3152 yield self
3153 .sync_loop_helper(&mut sync_settings)
3154 .instrument(parent_span.clone())
3155 .await;
3156
3157 Client::delay_sync(&mut last_sync_time).await
3158 }
3159 })
3160 }
3161
3162 /// Get the current, if any, sync token of the client.
3163 /// This will be None if the client didn't sync at least once.
3164 pub(crate) async fn sync_token(&self) -> Option<String> {
3165 self.inner.base_client.sync_token().await
3166 }
3167
3168 /// Gets information about the owner of a given access token.
3169 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3170 let request = whoami::v3::Request::new();
3171 self.send(request).await
3172 }
3173
3174 /// Subscribes a new receiver to client SessionChange broadcasts.
3175 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3176 let broadcast = &self.auth_ctx().session_change_sender;
3177 broadcast.subscribe()
3178 }
3179
3180 /// Sets the save/restore session callbacks.
3181 ///
3182 /// This is another mechanism to get synchronous updates to session tokens,
3183 /// while [`Self::subscribe_to_session_changes`] provides an async update.
3184 pub fn set_session_callbacks(
3185 &self,
3186 reload_session_callback: Box<ReloadSessionCallback>,
3187 save_session_callback: Box<SaveSessionCallback>,
3188 ) -> Result<()> {
3189 self.inner
3190 .auth_ctx
3191 .reload_session_callback
3192 .set(reload_session_callback)
3193 .map_err(|_| Error::MultipleSessionCallbacks)?;
3194
3195 self.inner
3196 .auth_ctx
3197 .save_session_callback
3198 .set(save_session_callback)
3199 .map_err(|_| Error::MultipleSessionCallbacks)?;
3200
3201 Ok(())
3202 }
3203
3204 /// Get the notification settings of the current owner of the client.
3205 pub async fn notification_settings(&self) -> NotificationSettings {
3206 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3207 NotificationSettings::new(self.clone(), ruleset)
3208 }
3209
3210 /// Create a new specialized `Client` that can process notifications.
3211 ///
3212 /// See [`CrossProcessLock::new`] to learn more about
3213 /// `cross_process_lock_config`.
3214 ///
3215 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3216 pub async fn notification_client(
3217 &self,
3218 cross_process_lock_config: CrossProcessLockConfig,
3219 ) -> Result<Client> {
3220 let client = Client {
3221 inner: ClientInner::new(
3222 self.inner.auth_ctx.clone(),
3223 self.server().cloned(),
3224 self.homeserver(),
3225 self.sliding_sync_version(),
3226 self.inner.http_client.clone(),
3227 self.inner
3228 .base_client
3229 .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3230 .await?,
3231 self.inner.caches.supported_versions.value(),
3232 self.inner.caches.well_known.value(),
3233 self.inner.respect_login_well_known,
3234 self.inner.event_cache.clone(),
3235 self.inner.send_queue_data.clone(),
3236 self.inner.latest_events.clone(),
3237 #[cfg(feature = "e2e-encryption")]
3238 self.inner.e2ee.encryption_settings,
3239 #[cfg(feature = "e2e-encryption")]
3240 self.inner.enable_share_history_on_invite,
3241 cross_process_lock_config,
3242 #[cfg(feature = "experimental-search")]
3243 self.inner.search_index.clone(),
3244 self.inner.thread_subscription_catchup.clone(),
3245 )
3246 .await,
3247 };
3248
3249 Ok(client)
3250 }
3251
3252 /// The [`EventCache`] instance for this [`Client`].
3253 pub fn event_cache(&self) -> &EventCache {
3254 // SAFETY: always initialized in the `Client` ctor.
3255 self.inner.event_cache.get().unwrap()
3256 }
3257
3258 /// The [`LatestEvents`] instance for this [`Client`].
3259 pub async fn latest_events(&self) -> &LatestEvents {
3260 self.inner
3261 .latest_events
3262 .get_or_init(|| async {
3263 LatestEvents::new(
3264 WeakClient::from_client(self),
3265 self.event_cache().clone(),
3266 SendQueue::new(self.clone()),
3267 self.room_info_notable_update_receiver(),
3268 )
3269 })
3270 .await
3271 }
3272
3273 /// Waits until an at least partially synced room is received, and returns
3274 /// it.
3275 ///
3276 /// **Note: this function will loop endlessly until either it finds the room
3277 /// or an externally set timeout happens.**
3278 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3279 loop {
3280 if let Some(room) = self.get_room(room_id) {
3281 if room.is_state_partially_or_fully_synced() {
3282 debug!("Found just created room!");
3283 return room;
3284 }
3285 debug!("Room wasn't partially synced, waiting for sync beat to try again");
3286 } else {
3287 debug!("Room wasn't found, waiting for sync beat to try again");
3288 }
3289 self.inner.sync_beat.listen().await;
3290 }
3291 }
3292
3293 /// Knock on a room given its `room_id_or_alias` to ask for permission to
3294 /// join it.
3295 pub async fn knock(
3296 &self,
3297 room_id_or_alias: OwnedRoomOrAliasId,
3298 reason: Option<String>,
3299 server_names: Vec<OwnedServerName>,
3300 ) -> Result<Room> {
3301 let request =
3302 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3303 let response = self.send(request).await?;
3304 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3305 Ok(Room::new(self.clone(), base_room))
3306 }
3307
3308 /// Checks whether the provided `user_id` belongs to an ignored user.
3309 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3310 self.base_client().is_user_ignored(user_id).await
3311 }
3312
3313 /// Gets the `max_upload_size` value from the homeserver, getting either a
3314 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3315 /// missing.
3316 ///
3317 /// Check the spec for more info:
3318 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3319 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3320 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3321 if let Some(data) = max_upload_size_lock.get() {
3322 return Ok(data.to_owned());
3323 }
3324
3325 // Use the authenticated endpoint when the server supports it.
3326 let supported_versions = self.supported_versions().await?;
3327 let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3328 .is_supported(&supported_versions);
3329
3330 let upload_size = if use_auth {
3331 self.send(authenticated_media::get_media_config::v1::Request::default())
3332 .await?
3333 .upload_size
3334 } else {
3335 #[allow(deprecated)]
3336 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3337 };
3338
3339 match max_upload_size_lock.set(upload_size) {
3340 Ok(_) => Ok(upload_size),
3341 Err(error) => {
3342 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3343 }
3344 }
3345 }
3346
3347 /// The settings to use for decrypting events.
3348 #[cfg(feature = "e2e-encryption")]
3349 pub fn decryption_settings(&self) -> &DecryptionSettings {
3350 &self.base_client().decryption_settings
3351 }
3352
3353 /// Returns the [`SearchIndex`] for this [`Client`].
3354 #[cfg(feature = "experimental-search")]
3355 pub fn search_index(&self) -> &SearchIndex {
3356 &self.inner.search_index
3357 }
3358
3359 /// Whether the client is configured to take thread subscriptions (MSC4306
3360 /// and MSC4308) into account, and the server enabled the experimental
3361 /// feature flag for it.
3362 ///
3363 /// This may cause filtering out of thread subscriptions, and loading the
3364 /// thread subscriptions via the sliding sync extension, when the room
3365 /// list service is being used.
3366 ///
3367 /// This is async and fallible as it may use the network to retrieve the
3368 /// server supported features, if they aren't cached already.
3369 pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3370 // Check if the client is configured to support thread subscriptions first.
3371 match self.base_client().threading_support {
3372 ThreadingSupport::Enabled { with_subscriptions: false }
3373 | ThreadingSupport::Disabled => return Ok(false),
3374 ThreadingSupport::Enabled { with_subscriptions: true } => {}
3375 }
3376
3377 // Now, let's check that the server supports it.
3378 let server_enabled = self
3379 .supported_versions()
3380 .await?
3381 .features
3382 .contains(&FeatureFlag::from("org.matrix.msc4306"));
3383
3384 Ok(server_enabled)
3385 }
3386
3387 /// Fetch thread subscriptions changes between `from` and up to `to`.
3388 ///
3389 /// The `limit` optional parameter can be used to limit the number of
3390 /// entries in a response. It can also be overridden by the server, if
3391 /// it's deemed too large.
3392 pub async fn fetch_thread_subscriptions(
3393 &self,
3394 from: Option<String>,
3395 to: Option<String>,
3396 limit: Option<UInt>,
3397 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3398 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3399 from,
3400 to,
3401 limit,
3402 });
3403 Ok(self.send(request).await?)
3404 }
3405
3406 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3407 self.inner.thread_subscription_catchup.get().unwrap()
3408 }
3409
3410 /// Perform database optimizations if any are available, i.e. vacuuming in
3411 /// SQLite.
3412 ///
3413 /// **Warning:** this was added to check if SQLite fragmentation was the
3414 /// source of performance issues, **DO NOT use in production**.
3415 #[doc(hidden)]
3416 pub async fn optimize_stores(&self) -> Result<()> {
3417 trace!("Optimizing state store...");
3418 self.state_store().optimize().await?;
3419
3420 trace!("Optimizing event cache store...");
3421 if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3422 clean_lock.optimize().await?;
3423 }
3424
3425 trace!("Optimizing media store...");
3426 self.media_store().lock().await?.optimize().await?;
3427
3428 Ok(())
3429 }
3430
3431 /// Returns the sizes of the existing stores, if known.
3432 pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3433 #[cfg(feature = "e2e-encryption")]
3434 let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3435 && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3436 {
3437 Some(store_size)
3438 } else {
3439 None
3440 };
3441 #[cfg(not(feature = "e2e-encryption"))]
3442 let crypto_store_size = None;
3443
3444 let state_store_size = self.state_store().get_size().await.ok().flatten();
3445
3446 let event_cache_store_size = if let Some(clean_lock) =
3447 self.event_cache_store().lock().await?.as_clean()
3448 && let Ok(Some(store_size)) = clean_lock.get_size().await
3449 {
3450 Some(store_size)
3451 } else {
3452 None
3453 };
3454
3455 let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3456
3457 Ok(StoreSizes {
3458 crypto_store: crypto_store_size,
3459 state_store: state_store_size,
3460 event_cache_store: event_cache_store_size,
3461 media_store: media_store_size,
3462 })
3463 }
3464
3465 /// Get a reference to the client's task monitor, for spawning background
3466 /// tasks.
3467 pub fn task_monitor(&self) -> &TaskMonitor {
3468 &self.inner.task_monitor
3469 }
3470
3471 /// Add a subscriber for duplicate key upload error notifications triggered
3472 /// by requests to /keys/upload.
3473 #[cfg(feature = "e2e-encryption")]
3474 pub fn subscribe_to_duplicate_key_upload_errors(
3475 &self,
3476 ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3477 self.inner.duplicate_key_upload_error_sender.subscribe()
3478 }
3479
3480 /// Check the record of whether we are waiting for an [MSC4268] key bundle
3481 /// for the given room.
3482 ///
3483 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3484 #[cfg(feature = "e2e-encryption")]
3485 pub async fn get_pending_key_bundle_details_for_room(
3486 &self,
3487 room_id: &RoomId,
3488 ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3489 Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3490 }
3491
3492 /// Returns the [`DmRoomDefinition`] this client uses to check if a room is
3493 /// a DM.
3494 pub fn dm_room_definition(&self) -> &DmRoomDefinition {
3495 &self.inner.base_client.dm_room_definition
3496 }
3497}
3498
3499/// Contains the disk size of the different stores, if known. It won't be
3500/// available for in-memory stores.
3501#[derive(Debug, Clone)]
3502pub struct StoreSizes {
3503 /// The size of the CryptoStore.
3504 pub crypto_store: Option<usize>,
3505 /// The size of the StateStore.
3506 pub state_store: Option<usize>,
3507 /// The size of the EventCacheStore.
3508 pub event_cache_store: Option<usize>,
3509 /// The size of the MediaStore.
3510 pub media_store: Option<usize>,
3511}
3512
3513#[cfg(any(feature = "testing", test))]
3514impl Client {
3515 /// Test helper to mark users as tracked by the crypto layer.
3516 #[cfg(feature = "e2e-encryption")]
3517 pub async fn update_tracked_users_for_testing(
3518 &self,
3519 user_ids: impl IntoIterator<Item = &UserId>,
3520 ) {
3521 let olm = self.olm_machine().await;
3522 let olm = olm.as_ref().unwrap();
3523 olm.update_tracked_users(user_ids).await.unwrap();
3524 }
3525}
3526
3527/// A weak reference to the inner client, useful when trying to get a handle
3528/// on the owning client.
3529#[derive(Clone, Debug)]
3530pub(crate) struct WeakClient {
3531 client: Weak<ClientInner>,
3532}
3533
3534impl WeakClient {
3535 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3536 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3537 Self { client: Arc::downgrade(client) }
3538 }
3539
3540 /// Construct a [`WeakClient`] from a [`Client`].
3541 pub fn from_client(client: &Client) -> Self {
3542 Self::from_inner(&client.inner)
3543 }
3544
3545 /// Attempts to get a [`Client`] from this [`WeakClient`].
3546 pub fn get(&self) -> Option<Client> {
3547 self.client.upgrade().map(|inner| Client { inner })
3548 }
3549
3550 /// Gets the number of strong (`Arc`) pointers still pointing to this
3551 /// client.
3552 #[allow(dead_code)]
3553 pub fn strong_count(&self) -> usize {
3554 self.client.strong_count()
3555 }
3556}
3557
3558/// Information about the state of a room before we joined it.
3559#[derive(Debug, Clone, Default)]
3560struct PreJoinRoomInfo {
3561 /// The user who invited us to the room, if any.
3562 pub inviter: Option<RoomMember>,
3563}
3564
3565// The http mocking library is not supported for wasm32
3566#[cfg(all(test, not(target_family = "wasm")))]
3567pub(crate) mod tests {
3568 use std::{sync::Arc, time::Duration};
3569
3570 use assert_matches::assert_matches;
3571 use assert_matches2::assert_let;
3572 use eyeball::SharedObservable;
3573 use futures_util::{FutureExt, StreamExt, pin_mut};
3574 use js_int::{UInt, uint};
3575 use matrix_sdk_base::{
3576 RoomState,
3577 store::{MemoryStore, StoreConfig},
3578 ttl::TtlValue,
3579 };
3580 use matrix_sdk_test::{
3581 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3582 event_factory::EventFactory,
3583 };
3584 #[cfg(target_family = "wasm")]
3585 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3586
3587 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3588 use ruma::{
3589 RoomId, ServerName, UserId,
3590 api::{
3591 FeatureFlag, MatrixVersion,
3592 client::{
3593 discovery::discover_homeserver::RtcFocusInfo,
3594 room::create_room::v3::Request as CreateRoomRequest,
3595 },
3596 },
3597 assign,
3598 events::{
3599 ignored_user_list::IgnoredUserListEventContent,
3600 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3601 },
3602 owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3603 };
3604 use serde_json::json;
3605 use stream_assert::{assert_next_matches, assert_pending};
3606 use tokio::{
3607 spawn,
3608 time::{sleep, timeout},
3609 };
3610 use url::Url;
3611
3612 use super::Client;
3613 use crate::{
3614 Error, TransmissionProgress,
3615 client::{WeakClient, caches::CachedValue, futures::SendMediaUploadRequest},
3616 config::{RequestConfig, SyncSettings},
3617 futures::SendRequest,
3618 media::MediaError,
3619 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3620 };
3621
3622 #[async_test]
3623 async fn test_account_data() {
3624 let server = MatrixMockServer::new().await;
3625 let client = server.client_builder().build().await;
3626
3627 let f = EventFactory::new();
3628 server
3629 .mock_sync()
3630 .ok_and_run(&client, |builder| {
3631 builder.add_global_account_data(
3632 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3633 );
3634 })
3635 .await;
3636
3637 let content = client
3638 .account()
3639 .account_data::<IgnoredUserListEventContent>()
3640 .await
3641 .unwrap()
3642 .unwrap()
3643 .deserialize()
3644 .unwrap();
3645
3646 assert_eq!(content.ignored_users.len(), 1);
3647 }
3648
3649 #[async_test]
3650 async fn test_successful_discovery() {
3651 // Imagine this is `matrix.org`.
3652 let server = MatrixMockServer::new().await;
3653 let server_url = server.uri();
3654
3655 // Imagine this is `matrix-client.matrix.org`.
3656 let homeserver = MatrixMockServer::new().await;
3657 let homeserver_url = homeserver.uri();
3658
3659 // Imagine Alice has the user ID `@alice:matrix.org`.
3660 let domain = server_url.strip_prefix("http://").unwrap();
3661 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3662
3663 // The `.well-known` is on the server (e.g. `matrix.org`).
3664 server
3665 .mock_well_known()
3666 .ok_with_homeserver_url(&homeserver_url)
3667 .mock_once()
3668 .named("well-known")
3669 .mount()
3670 .await;
3671
3672 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3673 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3674
3675 let client = Client::builder()
3676 .insecure_server_name_no_tls(alice.server_name())
3677 .build()
3678 .await
3679 .unwrap();
3680
3681 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3682 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3683 client.server_versions().await.unwrap();
3684 }
3685
3686 #[async_test]
3687 async fn test_discovery_broken_server() {
3688 let server = MatrixMockServer::new().await;
3689 let server_url = server.uri();
3690 let domain = server_url.strip_prefix("http://").unwrap();
3691 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3692
3693 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3694
3695 assert!(
3696 Client::builder()
3697 .insecure_server_name_no_tls(alice.server_name())
3698 .build()
3699 .await
3700 .is_err(),
3701 "Creating a client from a user ID should fail when the .well-known request fails."
3702 );
3703 }
3704
3705 #[async_test]
3706 async fn test_room_creation() {
3707 let server = MatrixMockServer::new().await;
3708 let client = server.client_builder().build().await;
3709
3710 let f = EventFactory::new().sender(user_id!("@example:localhost"));
3711 server
3712 .mock_sync()
3713 .ok_and_run(&client, |builder| {
3714 builder.add_joined_room(
3715 JoinedRoomBuilder::default()
3716 .add_state_event(
3717 f.member(user_id!("@example:localhost")).display_name("example"),
3718 )
3719 .add_state_event(f.default_power_levels()),
3720 );
3721 })
3722 .await;
3723
3724 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3725 assert_eq!(room.state(), RoomState::Joined);
3726 }
3727
3728 #[async_test]
3729 async fn test_retry_limit_http_requests() {
3730 let server = MatrixMockServer::new().await;
3731 let client = server
3732 .client_builder()
3733 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3734 .build()
3735 .await;
3736
3737 assert!(client.request_config().retry_limit.unwrap() == 4);
3738
3739 server.mock_who_am_i().error500().expect(4).mount().await;
3740
3741 client.whoami().await.unwrap_err();
3742 }
3743
3744 #[async_test]
3745 async fn test_retry_timeout_http_requests() {
3746 // Keep this timeout small so that the test doesn't take long
3747 let retry_timeout = Duration::from_secs(5);
3748 let server = MatrixMockServer::new().await;
3749 let client = server
3750 .client_builder()
3751 .on_builder(|builder| {
3752 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3753 })
3754 .build()
3755 .await;
3756
3757 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3758
3759 server.mock_login().error500().expect(2..).mount().await;
3760
3761 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3762 }
3763
3764 #[async_test]
3765 async fn test_short_retry_initial_http_requests() {
3766 let server = MatrixMockServer::new().await;
3767 let client = server
3768 .client_builder()
3769 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3770 .build()
3771 .await;
3772
3773 server.mock_login().error500().expect(3..).mount().await;
3774
3775 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3776 }
3777
3778 #[async_test]
3779 async fn test_no_retry_http_requests() {
3780 let server = MatrixMockServer::new().await;
3781 let client = server.client_builder().build().await;
3782
3783 server.mock_devices().error500().mock_once().mount().await;
3784
3785 client.devices().await.unwrap_err();
3786 }
3787
3788 #[async_test]
3789 async fn test_set_homeserver() {
3790 let client = MockClientBuilder::new(None).build().await;
3791 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3792
3793 let homeserver = Url::parse("http://example.com/").unwrap();
3794 client.set_homeserver(homeserver.clone());
3795 assert_eq!(client.homeserver(), homeserver);
3796 }
3797
3798 #[async_test]
3799 async fn test_search_user_request() {
3800 let server = MatrixMockServer::new().await;
3801 let client = server.client_builder().build().await;
3802
3803 server.mock_user_directory().ok().mock_once().mount().await;
3804
3805 let response = client.search_users("test", 50).await.unwrap();
3806 assert_eq!(response.results.len(), 1);
3807 let result = &response.results[0];
3808 assert_eq!(result.user_id.to_string(), "@test:example.me");
3809 assert_eq!(result.display_name.clone().unwrap(), "Test");
3810 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3811 assert!(!response.limited);
3812 }
3813
3814 #[async_test]
3815 async fn test_request_unstable_features() {
3816 let server = MatrixMockServer::new().await;
3817 let client = server.client_builder().no_server_versions().build().await;
3818
3819 server
3820 .mock_versions()
3821 .with_feature("org.matrix.e2e_cross_signing", true)
3822 .ok()
3823 .mock_once()
3824 .mount()
3825 .await;
3826
3827 let unstable_features = client.unstable_features().await.unwrap();
3828 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3829 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3830 }
3831
3832 #[async_test]
3833 async fn test_can_homeserver_push_encrypted_event_to_device() {
3834 let server = MatrixMockServer::new().await;
3835 let client = server.client_builder().no_server_versions().build().await;
3836
3837 server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3838
3839 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3840 assert!(msc4028_enabled);
3841 }
3842
3843 #[async_test]
3844 async fn test_recently_visited_rooms() {
3845 // Tracking recently visited rooms requires authentication
3846 let client = MockClientBuilder::new(None).unlogged().build().await;
3847 assert_matches!(
3848 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3849 Err(Error::AuthenticationRequired)
3850 );
3851
3852 let client = MockClientBuilder::new(None).build().await;
3853 let account = client.account();
3854
3855 // We should start off with an empty list
3856 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3857
3858 // Tracking a valid room id should add it to the list
3859 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3860 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3861 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3862
3863 // And the existing list shouldn't be changed
3864 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3865 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3866
3867 // Tracking the same room again shouldn't change the list
3868 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3869 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3870 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3871
3872 // Tracking a second room should add it to the front of the list
3873 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3874 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3875 assert_eq!(
3876 account.get_recently_visited_rooms().await.unwrap(),
3877 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3878 );
3879
3880 // Tracking the first room yet again should move it to the front of the list
3881 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3882 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3883 assert_eq!(
3884 account.get_recently_visited_rooms().await.unwrap(),
3885 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3886 );
3887
3888 // Tracking should be capped at 20
3889 for n in 0..20 {
3890 account
3891 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3892 .await
3893 .unwrap();
3894 }
3895
3896 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3897
3898 // And the initial rooms should've been pushed out
3899 let rooms = account.get_recently_visited_rooms().await.unwrap();
3900 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3901 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3902
3903 // And the last tracked room should be the first
3904 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3905 }
3906
3907 #[async_test]
3908 async fn test_client_no_cycle_with_event_cache() {
3909 let client = MockClientBuilder::new(None).build().await;
3910
3911 // Wait for the init tasks to die.
3912 sleep(Duration::from_secs(1)).await;
3913
3914 let weak_client = WeakClient::from_client(&client);
3915 assert_eq!(weak_client.strong_count(), 1);
3916
3917 {
3918 let room_id = room_id!("!room:example.org");
3919
3920 // Have the client know the room.
3921 let response = SyncResponseBuilder::default()
3922 .add_joined_room(JoinedRoomBuilder::new(room_id))
3923 .build_sync_response();
3924 client.inner.base_client.receive_sync_response(response).await.unwrap();
3925
3926 client.event_cache().subscribe().unwrap();
3927
3928 let (_room_event_cache, _drop_handles) =
3929 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3930 }
3931
3932 drop(client);
3933
3934 // Give a bit of time for background tasks to die.
3935 sleep(Duration::from_secs(1)).await;
3936
3937 // The weak client must be the last reference to the client now.
3938 assert_eq!(weak_client.strong_count(), 0);
3939 let client = weak_client.get();
3940 assert!(
3941 client.is_none(),
3942 "too many strong references to the client: {}",
3943 Arc::strong_count(&client.unwrap().inner)
3944 );
3945 }
3946
3947 #[async_test]
3948 async fn test_supported_versions_caching() {
3949 let server = MatrixMockServer::new().await;
3950
3951 let versions_mock = server
3952 .mock_versions()
3953 .expect_default_access_token()
3954 .with_feature("org.matrix.e2e_cross_signing", true)
3955 .ok()
3956 .named("first versions mock")
3957 .expect(1)
3958 .mount_as_scoped()
3959 .await;
3960
3961 let memory_store = Arc::new(MemoryStore::new());
3962 let client = server
3963 .client_builder()
3964 .no_server_versions()
3965 .on_builder(|builder| {
3966 builder.store_config(
3967 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3968 .state_store(memory_store.clone()),
3969 )
3970 })
3971 .build()
3972 .await;
3973
3974 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3975
3976 // The result was cached.
3977 assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3978 // This subsequent call hits the in-memory cache.
3979 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3980
3981 drop(client);
3982
3983 let client = server
3984 .client_builder()
3985 .no_server_versions()
3986 .on_builder(|builder| {
3987 builder.store_config(
3988 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3989 .state_store(memory_store.clone()),
3990 )
3991 })
3992 .build()
3993 .await;
3994
3995 // These calls to the new client hit the on-disk cache.
3996 assert!(
3997 client
3998 .unstable_features()
3999 .await
4000 .unwrap()
4001 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
4002 );
4003
4004 let supported = client.supported_versions().await.unwrap();
4005 assert!(supported.versions.contains(&MatrixVersion::V1_0));
4006 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4007
4008 // Then this call hits the in-memory cache.
4009 let supported = client.supported_versions().await.unwrap();
4010 assert!(supported.versions.contains(&MatrixVersion::V1_0));
4011 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4012
4013 drop(versions_mock);
4014
4015 // Now, reset the cache, and observe the endpoint being called again once.
4016 client.reset_supported_versions().await.unwrap();
4017
4018 server.mock_versions().ok().expect(2).named("second versions mock").mount().await;
4019
4020 // Hits network again.
4021 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4022 // Hits in-memory cache again.
4023 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4024 assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4025
4026 // Force an expiry of the data.
4027 let supported_versions = client.supported_versions_cached().await.unwrap().unwrap();
4028 let mut ttl_value = TtlValue::new(supported_versions);
4029 ttl_value.expire();
4030 client.inner.caches.supported_versions.set_value(ttl_value);
4031
4032 // Call the method to trigger a cache refresh background task.
4033 client.supported_versions_cached().await.unwrap().unwrap();
4034
4035 // We wait for the task to finish, the endpoint should have been called again.
4036 sleep(Duration::from_secs(1)).await;
4037 assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4038 }
4039
4040 #[async_test]
4041 async fn test_well_known_caching() {
4042 let server = MatrixMockServer::new().await;
4043 let server_url = server.uri();
4044 let domain = server_url.strip_prefix("http://").unwrap();
4045 let server_name = <&ServerName>::try_from(domain).unwrap();
4046 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
4047
4048 let well_known_mock = server
4049 .mock_well_known()
4050 .ok()
4051 .named("well known mock")
4052 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
4053 .mount_as_scoped()
4054 .await;
4055
4056 let memory_store = Arc::new(MemoryStore::new());
4057 let client = Client::builder()
4058 .insecure_server_name_no_tls(server_name)
4059 .store_config(
4060 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4061 .state_store(memory_store.clone()),
4062 )
4063 .build()
4064 .await
4065 .unwrap();
4066
4067 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4068
4069 // This subsequent call hits the in-memory cache.
4070 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4071
4072 drop(client);
4073
4074 let client = server
4075 .client_builder()
4076 .no_server_versions()
4077 .on_builder(|builder| {
4078 builder.store_config(
4079 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4080 .state_store(memory_store.clone()),
4081 )
4082 })
4083 .build()
4084 .await;
4085
4086 // This call to the new client hits the on-disk cache.
4087 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4088
4089 // Then this call hits the in-memory cache.
4090 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4091
4092 drop(well_known_mock);
4093
4094 // Now, reset the cache, and observe the endpoints being called again once.
4095 client.reset_well_known().await.unwrap();
4096
4097 server.mock_well_known().ok().named("second well known mock").expect(2).mount().await;
4098
4099 // Hits network again.
4100 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4101 // Hits in-memory cache again.
4102 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4103
4104 // Force an expiry of the data.
4105 let well_known = client.well_known().await;
4106 let mut ttl_value = TtlValue::new(well_known);
4107 ttl_value.expire();
4108 client.inner.caches.well_known.set_value(ttl_value);
4109
4110 // Call the method again to trigger a cache refresh background task.
4111 client.well_known().await;
4112
4113 // We wait for the task to finish, the endpoint should have been called again.
4114 // We need to wait a bit because the first requests using the server name of the
4115 // user will fail, only the requests using the homeserver URL will succeed.
4116 sleep(Duration::from_secs(5)).await;
4117 assert_matches!(client.inner.caches.well_known.value(), CachedValue::Cached(value) if !value.has_expired());
4118 }
4119
4120 #[async_test]
4121 async fn test_missing_well_known_caching() {
4122 let server = MatrixMockServer::new().await;
4123 let rtc_foci: Vec<RtcFocusInfo> = vec![];
4124
4125 let well_known_mock = server
4126 .mock_well_known()
4127 .error_unrecognized()
4128 .named("first well-known mock")
4129 .expect(1)
4130 .mount_as_scoped()
4131 .await;
4132
4133 let memory_store = Arc::new(MemoryStore::new());
4134 let client = server
4135 .client_builder()
4136 .on_builder(|builder| {
4137 builder.store_config(
4138 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4139 .state_store(memory_store.clone()),
4140 )
4141 })
4142 .build()
4143 .await;
4144
4145 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4146
4147 // This subsequent call hits the in-memory cache.
4148 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4149
4150 drop(client);
4151
4152 let client = server
4153 .client_builder()
4154 .on_builder(|builder| {
4155 builder.store_config(
4156 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4157 .state_store(memory_store.clone()),
4158 )
4159 })
4160 .build()
4161 .await;
4162
4163 // This call to the new client hits the on-disk cache.
4164 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4165
4166 // Then this call hits the in-memory cache.
4167 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4168
4169 drop(well_known_mock);
4170
4171 // Now, reset the cache, and observe the endpoints being called again once.
4172 client.reset_well_known().await.unwrap();
4173
4174 server
4175 .mock_well_known()
4176 .error_unrecognized()
4177 .expect(1)
4178 .named("second well-known mock")
4179 .mount()
4180 .await;
4181
4182 // Hits network again.
4183 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4184 // Hits in-memory cache again.
4185 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4186 }
4187
4188 #[async_test]
4189 async fn test_no_network_doesnt_cause_infinite_retries() {
4190 // We want infinite retries for transient errors.
4191 let client = MockClientBuilder::new(None)
4192 .on_builder(|builder| builder.request_config(RequestConfig::new()))
4193 .build()
4194 .await;
4195
4196 // We don't define a mock server on purpose here, so that the error is really a
4197 // network error.
4198 client.whoami().await.unwrap_err();
4199 }
4200
4201 #[async_test]
4202 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4203 let server = MatrixMockServer::new().await;
4204 let client = server.client_builder().build().await;
4205
4206 let room_id = room_id!("!room:example.org");
4207
4208 server
4209 .mock_sync()
4210 .ok_and_run(&client, |builder| {
4211 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4212 })
4213 .await;
4214
4215 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4216 assert_eq!(room.room_id(), room_id);
4217 }
4218
4219 #[async_test]
4220 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4221 let server = MatrixMockServer::new().await;
4222 let client = server.client_builder().build().await;
4223
4224 let room_id = room_id!("!room:example.org");
4225
4226 let client = Arc::new(client);
4227
4228 // Perform the /sync request with a delay so it starts after the
4229 // `await_room_remote_echo` call has happened
4230 spawn({
4231 let client = client.clone();
4232 async move {
4233 sleep(Duration::from_millis(100)).await;
4234
4235 server
4236 .mock_sync()
4237 .ok_and_run(&client, |builder| {
4238 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4239 })
4240 .await;
4241 }
4242 });
4243
4244 let room =
4245 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4246 assert_eq!(room.room_id(), room_id);
4247 }
4248
4249 #[async_test]
4250 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4251 let client = MockClientBuilder::new(None).build().await;
4252
4253 let room_id = room_id!("!room:example.org");
4254 // Room is not present so the client won't be able to find it. The call will
4255 // timeout.
4256 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4257 }
4258
4259 #[async_test]
4260 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4261 let server = MatrixMockServer::new().await;
4262 let client = server.client_builder().build().await;
4263
4264 server.mock_create_room().ok().mount().await;
4265
4266 // Create a room in the internal store
4267 let room = client
4268 .create_room(assign!(CreateRoomRequest::new(), {
4269 invite: vec![],
4270 is_direct: false,
4271 }))
4272 .await
4273 .unwrap();
4274
4275 // Room is locally present, but not synced, the call will timeout
4276 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4277 .await
4278 .unwrap_err();
4279 }
4280
4281 #[async_test]
4282 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4283 let server = MatrixMockServer::new().await;
4284 let client = server.client_builder().build().await;
4285
4286 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4287
4288 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4289 assert_matches!(ret, Ok(true));
4290 }
4291
4292 #[async_test]
4293 async fn test_is_room_alias_available_if_alias_is_resolved() {
4294 let server = MatrixMockServer::new().await;
4295 let client = server.client_builder().build().await;
4296
4297 server
4298 .mock_room_directory_resolve_alias()
4299 .ok("!some_room_id:matrix.org", Vec::new())
4300 .expect(1)
4301 .mount()
4302 .await;
4303
4304 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4305 assert_matches!(ret, Ok(false));
4306 }
4307
4308 #[async_test]
4309 async fn test_is_room_alias_available_if_error_found() {
4310 let server = MatrixMockServer::new().await;
4311 let client = server.client_builder().build().await;
4312
4313 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4314
4315 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4316 assert_matches!(ret, Err(_));
4317 }
4318
4319 #[async_test]
4320 async fn test_create_room_alias() {
4321 let server = MatrixMockServer::new().await;
4322 let client = server.client_builder().build().await;
4323
4324 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4325
4326 let ret = client
4327 .create_room_alias(
4328 room_alias_id!("#some_alias:matrix.org"),
4329 room_id!("!some_room:matrix.org"),
4330 )
4331 .await;
4332 assert_matches!(ret, Ok(()));
4333 }
4334
4335 #[async_test]
4336 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4337 let server = MatrixMockServer::new().await;
4338 let client = server.client_builder().build().await;
4339
4340 let room_id = room_id!("!a-room:matrix.org");
4341
4342 // Make sure the summary endpoint is called once
4343 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4344
4345 // We create a locally cached invited room
4346 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4347
4348 // And we get a preview, the server endpoint was reached
4349 let preview = client
4350 .get_room_preview(room_id.into(), Vec::new())
4351 .await
4352 .expect("Room preview should be retrieved");
4353
4354 assert_eq!(invited_room.room_id(), preview.room_id);
4355 }
4356
4357 #[async_test]
4358 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4359 let server = MatrixMockServer::new().await;
4360 let client = server.client_builder().build().await;
4361
4362 let room_id = room_id!("!a-room:matrix.org");
4363
4364 // Make sure the summary endpoint is called once
4365 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4366
4367 // We create a locally cached left room
4368 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4369
4370 // And we get a preview, the server endpoint was reached
4371 let preview = client
4372 .get_room_preview(room_id.into(), Vec::new())
4373 .await
4374 .expect("Room preview should be retrieved");
4375
4376 assert_eq!(left_room.room_id(), preview.room_id);
4377 }
4378
4379 #[async_test]
4380 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4381 let server = MatrixMockServer::new().await;
4382 let client = server.client_builder().build().await;
4383
4384 let room_id = room_id!("!a-room:matrix.org");
4385
4386 // Make sure the summary endpoint is called once
4387 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4388
4389 // We create a locally cached knocked room
4390 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4391
4392 // And we get a preview, the server endpoint was reached
4393 let preview = client
4394 .get_room_preview(room_id.into(), Vec::new())
4395 .await
4396 .expect("Room preview should be retrieved");
4397
4398 assert_eq!(knocked_room.room_id(), preview.room_id);
4399 }
4400
4401 #[async_test]
4402 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4403 let server = MatrixMockServer::new().await;
4404 let client = server.client_builder().build().await;
4405
4406 let room_id = room_id!("!a-room:matrix.org");
4407
4408 // Make sure the summary endpoint is not called
4409 server.mock_room_summary().ok(room_id).never().mount().await;
4410
4411 // We create a locally cached joined room
4412 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4413
4414 // And we get a preview, no server endpoint was reached
4415 let preview = client
4416 .get_room_preview(room_id.into(), Vec::new())
4417 .await
4418 .expect("Room preview should be retrieved");
4419
4420 assert_eq!(joined_room.room_id(), preview.room_id);
4421 }
4422
4423 #[async_test]
4424 async fn test_media_preview_config() {
4425 let server = MatrixMockServer::new().await;
4426 let client = server.client_builder().build().await;
4427
4428 server
4429 .mock_sync()
4430 .ok_and_run(&client, |builder| {
4431 builder.add_custom_global_account_data(json!({
4432 "content": {
4433 "media_previews": "private",
4434 "invite_avatars": "off"
4435 },
4436 "type": "m.media_preview_config"
4437 }));
4438 })
4439 .await;
4440
4441 let (initial_value, stream) =
4442 client.account().observe_media_preview_config().await.unwrap();
4443
4444 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4445 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4446 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4447 pin_mut!(stream);
4448 assert_pending!(stream);
4449
4450 server
4451 .mock_sync()
4452 .ok_and_run(&client, |builder| {
4453 builder.add_custom_global_account_data(json!({
4454 "content": {
4455 "media_previews": "off",
4456 "invite_avatars": "on"
4457 },
4458 "type": "m.media_preview_config"
4459 }));
4460 })
4461 .await;
4462
4463 assert_next_matches!(
4464 stream,
4465 MediaPreviewConfigEventContent {
4466 media_previews: Some(MediaPreviews::Off),
4467 invite_avatars: Some(InviteAvatars::On),
4468 ..
4469 }
4470 );
4471 assert_pending!(stream);
4472 }
4473
4474 #[async_test]
4475 async fn test_unstable_media_preview_config() {
4476 let server = MatrixMockServer::new().await;
4477 let client = server.client_builder().build().await;
4478
4479 server
4480 .mock_sync()
4481 .ok_and_run(&client, |builder| {
4482 builder.add_custom_global_account_data(json!({
4483 "content": {
4484 "media_previews": "private",
4485 "invite_avatars": "off"
4486 },
4487 "type": "io.element.msc4278.media_preview_config"
4488 }));
4489 })
4490 .await;
4491
4492 let (initial_value, stream) =
4493 client.account().observe_media_preview_config().await.unwrap();
4494
4495 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4496 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4497 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4498 pin_mut!(stream);
4499 assert_pending!(stream);
4500
4501 server
4502 .mock_sync()
4503 .ok_and_run(&client, |builder| {
4504 builder.add_custom_global_account_data(json!({
4505 "content": {
4506 "media_previews": "off",
4507 "invite_avatars": "on"
4508 },
4509 "type": "io.element.msc4278.media_preview_config"
4510 }));
4511 })
4512 .await;
4513
4514 assert_next_matches!(
4515 stream,
4516 MediaPreviewConfigEventContent {
4517 media_previews: Some(MediaPreviews::Off),
4518 invite_avatars: Some(InviteAvatars::On),
4519 ..
4520 }
4521 );
4522 assert_pending!(stream);
4523 }
4524
4525 #[async_test]
4526 async fn test_media_preview_config_not_found() {
4527 let server = MatrixMockServer::new().await;
4528 let client = server.client_builder().build().await;
4529
4530 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4531
4532 assert!(initial_value.is_none());
4533 }
4534
4535 #[async_test]
4536 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4537 // The default Matrix version we use is 1.11 or higher, so authenticated media
4538 // is supported.
4539 let server = MatrixMockServer::new().await;
4540 let client = server.client_builder().build().await;
4541
4542 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4543
4544 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4545 client.load_or_fetch_max_upload_size().await.unwrap();
4546
4547 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4548 }
4549
4550 #[async_test]
4551 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4552 // The server must advertise support for the stable feature for authenticated
4553 // media support, so we mock the `GET /versions` response.
4554 let server = MatrixMockServer::new().await;
4555 let client = server.client_builder().no_server_versions().build().await;
4556
4557 server
4558 .mock_versions()
4559 .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4560 .with_feature("org.matrix.msc3916.stable", true)
4561 .ok()
4562 .named("versions")
4563 .expect(1)
4564 .mount()
4565 .await;
4566
4567 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4568
4569 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4570 client.load_or_fetch_max_upload_size().await.unwrap();
4571
4572 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4573 }
4574
4575 #[async_test]
4576 async fn test_load_or_fetch_max_upload_size_no_auth() {
4577 // The server must not support Matrix 1.11 or higher for unauthenticated
4578 // media requests, so we mock the `GET /versions` response.
4579 let server = MatrixMockServer::new().await;
4580 let client = server.client_builder().no_server_versions().build().await;
4581
4582 server
4583 .mock_versions()
4584 .with_versions(vec!["v1.1"])
4585 .ok()
4586 .named("versions")
4587 .expect(1)
4588 .mount()
4589 .await;
4590
4591 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4592
4593 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4594 client.load_or_fetch_max_upload_size().await.unwrap();
4595
4596 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4597 }
4598
4599 #[async_test]
4600 async fn test_uploading_a_too_large_media_file() {
4601 let server = MatrixMockServer::new().await;
4602 let client = server.client_builder().build().await;
4603
4604 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4605 client.load_or_fetch_max_upload_size().await.unwrap();
4606 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4607
4608 let data = vec![1, 2];
4609 let upload_request =
4610 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4611 let request = SendRequest {
4612 client: client.clone(),
4613 request: upload_request,
4614 config: None,
4615 send_progress: SharedObservable::new(TransmissionProgress::default()),
4616 };
4617 let media_request = SendMediaUploadRequest::new(request);
4618
4619 let error = media_request.await.err();
4620 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4621 assert_eq!(max, uint!(1));
4622 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4623 }
4624
4625 #[async_test]
4626 async fn test_dont_ignore_timeout_on_first_sync() {
4627 let server = MatrixMockServer::new().await;
4628 let client = server.client_builder().build().await;
4629
4630 server
4631 .mock_sync()
4632 .timeout(Some(Duration::from_secs(30)))
4633 .ok(|_| {})
4634 .mock_once()
4635 .named("sync_with_timeout")
4636 .mount()
4637 .await;
4638
4639 // Call the endpoint once to check the timeout.
4640 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4641
4642 timeout(Duration::from_secs(1), async {
4643 stream.next().await.unwrap().unwrap();
4644 })
4645 .await
4646 .unwrap();
4647 }
4648
4649 #[async_test]
4650 async fn test_ignore_timeout_on_first_sync() {
4651 let server = MatrixMockServer::new().await;
4652 let client = server.client_builder().build().await;
4653
4654 server
4655 .mock_sync()
4656 .timeout(None)
4657 .ok(|_| {})
4658 .mock_once()
4659 .named("sync_no_timeout")
4660 .mount()
4661 .await;
4662 server
4663 .mock_sync()
4664 .timeout(Some(Duration::from_secs(30)))
4665 .ok(|_| {})
4666 .mock_once()
4667 .named("sync_with_timeout")
4668 .mount()
4669 .await;
4670
4671 // Call each version of the endpoint once to check the timeouts.
4672 let mut stream = Box::pin(
4673 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4674 );
4675
4676 timeout(Duration::from_secs(1), async {
4677 stream.next().await.unwrap().unwrap();
4678 stream.next().await.unwrap().unwrap();
4679 })
4680 .await
4681 .unwrap();
4682 }
4683
4684 #[async_test]
4685 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4686 let server = MatrixMockServer::new().await;
4687 let client = server.client_builder().build().await;
4688 // This is the user ID that is inside MemberAdditional.
4689 // Note the confusing username, so we can share
4690 // GlobalAccountDataTestEvent::Direct with the invited test.
4691 let user_id = user_id!("@invited:localhost");
4692
4693 // When we receive a sync response saying "invited" is invited to a DM
4694 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4695 let response = SyncResponseBuilder::default()
4696 .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4697 .add_global_account_data(
4698 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4699 )
4700 .build_sync_response();
4701 client.base_client().receive_sync_response(response).await.unwrap();
4702
4703 // Then get_dm_room finds this room
4704 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4705 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4706 }
4707
4708 #[async_test]
4709 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4710 let server = MatrixMockServer::new().await;
4711 let client = server.client_builder().build().await;
4712 // This is the user ID that is inside MemberInvite
4713 let user_id = user_id!("@invited:localhost");
4714
4715 // When we receive a sync response saying "invited" is invited to a DM
4716 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4717 let response = SyncResponseBuilder::default()
4718 .add_joined_room(
4719 JoinedRoomBuilder::default()
4720 .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4721 )
4722 .add_global_account_data(
4723 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4724 )
4725 .build_sync_response();
4726 client.base_client().receive_sync_response(response).await.unwrap();
4727
4728 // Then get_dm_room finds this room
4729 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4730 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4731 }
4732
4733 #[async_test]
4734 async fn test_get_dm_room_still_finds_left_room() {
4735 // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4736 // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4737
4738 let server = MatrixMockServer::new().await;
4739 let client = server.client_builder().build().await;
4740 // This is the user ID that is inside MemberAdditional.
4741 // Note the confusing username, so we can share
4742 // GlobalAccountDataTestEvent::Direct with the invited test.
4743 let user_id = user_id!("@invited:localhost");
4744
4745 // When we receive a sync response saying "invited" has left a DM
4746 let f = EventFactory::new().sender(user_id);
4747 let response = SyncResponseBuilder::default()
4748 .add_joined_room(
4749 JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4750 )
4751 .add_global_account_data(
4752 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4753 )
4754 .build_sync_response();
4755 client.base_client().receive_sync_response(response).await.unwrap();
4756
4757 // Then get_dm_room finds this room
4758 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4759 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4760 }
4761
4762 #[async_test]
4763 async fn test_device_exists() {
4764 let server = MatrixMockServer::new().await;
4765 let client = server.client_builder().build().await;
4766
4767 server.mock_get_device().ok().expect(1).mount().await;
4768
4769 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4770 }
4771
4772 #[async_test]
4773 async fn test_device_exists_404() {
4774 let server = MatrixMockServer::new().await;
4775 let client = server.client_builder().build().await;
4776
4777 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4778 }
4779
4780 #[async_test]
4781 async fn test_device_exists_500() {
4782 let server = MatrixMockServer::new().await;
4783 let client = server.client_builder().build().await;
4784
4785 server.mock_get_device().error500().expect(1).mount().await;
4786
4787 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4788 }
4789
4790 #[async_test]
4791 async fn test_fetching_well_known_with_homeserver_url() {
4792 let server = MatrixMockServer::new().await;
4793 let client = server.client_builder().build().await;
4794 server.mock_well_known().ok().mount().await;
4795
4796 assert_matches!(client.fetch_client_well_known().await, Some(_));
4797 }
4798
4799 #[async_test]
4800 async fn test_fetching_well_known_with_server_name() {
4801 let server = MatrixMockServer::new().await;
4802 let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4803
4804 server.mock_well_known().ok().mount().await;
4805
4806 let client = MockClientBuilder::new(None)
4807 .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4808 .build()
4809 .await;
4810
4811 assert_matches!(client.fetch_client_well_known().await, Some(_));
4812 }
4813
4814 #[async_test]
4815 async fn test_fetching_well_known_with_domain_part_of_user_id() {
4816 let server = MatrixMockServer::new().await;
4817 server.mock_well_known().ok().mount().await;
4818
4819 let user_id =
4820 UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4821 let client = MockClientBuilder::new(None)
4822 .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4823 .build()
4824 .await;
4825
4826 assert_matches!(client.fetch_client_well_known().await, Some(_));
4827 }
4828}