matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::{StreamExt, join};
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32 DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35 BaseClient, DmRoomDefinition, RoomInfoNotableUpdate, RoomState, RoomStateFilter,
36 SendOutsideWasm, SessionMeta, StateStoreDataKey, StateStoreDataValue, StoreError,
37 SyncOutsideWasm, ThreadingSupport,
38 event_cache::store::EventCacheStoreLock,
39 media::store::MediaStoreLock,
40 store::{DynStateStore, RoomLoadSettings, SupportedVersionsResponse, WellKnownResponse},
41 sync::{Notification, RoomUpdates},
42 task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl::TtlValue};
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50 api::{
51 FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52 client::{
53 account::whoami,
54 alias::{create_alias, delete_alias, get_alias},
55 authenticated_media,
56 device::{self, delete_devices, get_devices, update_device},
57 directory::{get_public_rooms, get_public_rooms_filtered},
58 discovery::{discover_homeserver, get_supported_versions},
59 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
60 knock::knock_room,
61 media,
62 membership::{join_room_by_id, join_room_by_id_or_alias},
63 room::create_room,
64 rtc::RtcTransport,
65 session::login::v3::DiscoveryInfo,
66 sync::sync_events,
67 threads::get_thread_subscriptions_changes,
68 uiaa,
69 user_directory::search_users,
70 },
71 error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
72 path_builder::PathBuilder,
73 },
74 assign,
75 events::{beacon_info::OriginalSyncBeaconInfoEvent, direct::DirectUserIdentifier},
76 push::Ruleset,
77 time::Instant,
78};
79use serde::de::DeserializeOwned;
80use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
81use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
82use url::Url;
83
84use self::{
85 caches::{Cache, CachedValue, ClientCaches},
86 futures::SendRequest,
87};
88use crate::{
89 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
90 Room, SessionTokens, TransmissionProgress,
91 authentication::{
92 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
93 oauth::OAuth,
94 },
95 client::{
96 homeserver_capabilities::HomeserverCapabilities,
97 thread_subscriptions::ThreadSubscriptionCatchup,
98 },
99 config::{RequestConfig, SyncToken},
100 deduplicating_handler::DeduplicatingHandler,
101 error::HttpResult,
102 event_cache::EventCache,
103 event_handler::{
104 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
105 EventHandlerStore, ObservableEventHandler, SyncEvent,
106 },
107 http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
108 latest_events::LatestEvents,
109 live_locations_observer::BeaconInfoUpdate,
110 media::{MediaError, MediaFetcher},
111 notification_settings::NotificationSettings,
112 room::RoomMember,
113 room_preview::RoomPreview,
114 send_queue::{SendQueue, SendQueueData},
115 sliding_sync::Version as SlidingSyncVersion,
116 sync::{RoomUpdate, SyncResponse},
117};
118#[cfg(feature = "e2e-encryption")]
119use crate::{
120 cross_process_lock::CrossProcessLock,
121 encryption::{
122 DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
123 VerificationState,
124 },
125};
126
127mod builder;
128pub(crate) mod caches;
129pub(crate) mod futures;
130pub(crate) mod homeserver_capabilities;
131pub(crate) mod thread_subscriptions;
132
133pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
134#[cfg(feature = "experimental-search")]
135use crate::search_index::SearchIndex;
136
137#[cfg(not(target_family = "wasm"))]
138type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
139#[cfg(target_family = "wasm")]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
141
142#[cfg(not(target_family = "wasm"))]
143type NotificationHandlerFn =
144 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
145#[cfg(target_family = "wasm")]
146type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
147
148/// Enum controlling if a loop running callbacks should continue or abort.
149///
150/// This is mainly used in the [`sync_with_callback`] method, the return value
151/// of the provided callback controls if the sync loop should be exited.
152///
153/// [`sync_with_callback`]: #method.sync_with_callback
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub enum LoopCtrl {
156 /// Continue running the loop.
157 Continue,
158 /// Break out of the loop.
159 Break,
160}
161
162/// Represents changes that can occur to a `Client`s `Session`.
163#[derive(Debug, Clone, PartialEq)]
164pub enum SessionChange {
165 /// The session's token is no longer valid.
166 UnknownToken(UnknownTokenErrorData),
167 /// The session's tokens have been refreshed.
168 TokensRefreshed,
169}
170
171/// Information about the server vendor obtained from the federation API.
172#[derive(Debug, Clone, PartialEq, Eq)]
173#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
174pub struct ServerVendorInfo {
175 /// The server name.
176 pub server_name: String,
177 /// The server version.
178 pub version: String,
179}
180
181/// Information about a map tile server advertised by the homeserver through the
182/// `tile_server` field of the matrix client well-known (MSC3488).
183#[derive(Debug, Clone, PartialEq, Eq, Hash)]
184#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
185pub struct TileServerInfo {
186 /// The URL of a map tile server's `style.json` file. See the
187 /// [Mapbox Style Specification](https://docs.mapbox.com/mapbox-gl-js/style-spec/)
188 /// for more details.
189 pub map_style_url: String,
190}
191
192impl From<discover_homeserver::TileServerInfo> for TileServerInfo {
193 fn from(value: discover_homeserver::TileServerInfo) -> Self {
194 Self { map_style_url: value.map_style_url }
195 }
196}
197
198/// An async/await enabled Matrix client.
199///
200/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
201#[derive(Clone)]
202pub struct Client {
203 pub(crate) inner: Arc<ClientInner>,
204}
205
206#[derive(Default)]
207pub(crate) struct ClientLocks {
208 /// Lock ensuring that only a single room may be marked as a DM at once.
209 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
210 /// explanation.
211 pub(crate) mark_as_dm_lock: Mutex<()>,
212
213 /// Lock ensuring that only a single secret store is getting opened at the
214 /// same time.
215 ///
216 /// This is important so we don't accidentally create multiple different new
217 /// default secret storage keys.
218 #[cfg(feature = "e2e-encryption")]
219 pub(crate) open_secret_store_lock: Mutex<()>,
220
221 /// Lock ensuring that we're only storing a single secret at a time.
222 ///
223 /// Take a look at the [`SecretStore::put_secret`] method for a more
224 /// detailed explanation.
225 ///
226 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
227 #[cfg(feature = "e2e-encryption")]
228 pub(crate) store_secret_lock: Mutex<()>,
229
230 /// Lock ensuring that only one method at a time might modify our backup.
231 #[cfg(feature = "e2e-encryption")]
232 pub(crate) backup_modify_lock: Mutex<()>,
233
234 /// Lock ensuring that we're going to attempt to upload backups for a single
235 /// requester.
236 #[cfg(feature = "e2e-encryption")]
237 pub(crate) backup_upload_lock: Mutex<()>,
238
239 /// Handler making sure we only have one group session sharing request in
240 /// flight per room.
241 #[cfg(feature = "e2e-encryption")]
242 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
243
244 /// Lock making sure we're only doing one key claim request at a time.
245 #[cfg(feature = "e2e-encryption")]
246 pub(crate) key_claim_lock: Mutex<()>,
247
248 /// Handler to ensure that only one members request is running at a time,
249 /// given a room.
250 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
251
252 /// Handler to ensure that only one encryption state request is running at a
253 /// time, given a room.
254 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
255
256 /// Deduplicating handler for sending read receipts. The string is an
257 /// internal implementation detail, see [`Self::send_single_receipt`].
258 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
259
260 #[cfg(feature = "e2e-encryption")]
261 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
262
263 /// Latest "generation" of data known by the crypto store.
264 ///
265 /// This is a counter that only increments, set in the database (and can
266 /// wrap). It's incremented whenever some process acquires a lock for the
267 /// first time. *This assumes the crypto store lock is being held, to
268 /// avoid data races on writing to this value in the store*.
269 ///
270 /// The current process will maintain this value in local memory and in the
271 /// DB over time. Observing a different value than the one read in
272 /// memory, when reading from the store indicates that somebody else has
273 /// written into the database under our feet.
274 ///
275 /// TODO: this should live in the `OlmMachine`, since it's information
276 /// related to the lock. As of today (2023-07-28), we blow up the entire
277 /// olm machine when there's a generation mismatch. So storing the
278 /// generation in the olm machine would make the client think there's
279 /// *always* a mismatch, and that's why we need to store the generation
280 /// outside the `OlmMachine`.
281 #[cfg(feature = "e2e-encryption")]
282 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
283}
284
285pub(crate) struct ClientInner {
286 /// All the data related to authentication and authorization.
287 pub(crate) auth_ctx: Arc<AuthCtx>,
288
289 /// The URL of the server.
290 ///
291 /// Not to be confused with the `Self::homeserver`. `server` is usually
292 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
293 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
294 /// homeserver (at the time of writing — 2024-08-28).
295 ///
296 /// This value is optional depending on how the `Client` has been built.
297 /// If it's been built from a homeserver URL directly, we don't know the
298 /// server. However, if the `Client` has been built from a server URL or
299 /// name, then the homeserver has been discovered, and we know both.
300 server: Option<Url>,
301
302 /// The URL of the homeserver to connect to.
303 ///
304 /// This is the URL for the client-server Matrix API.
305 homeserver: StdRwLock<Url>,
306
307 /// The sliding sync version.
308 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
309
310 /// The underlying HTTP client.
311 pub(crate) http_client: HttpClient,
312
313 /// User session data.
314 pub(super) base_client: BaseClient,
315
316 /// Collection of in-memory caches for the [`Client`].
317 pub(crate) caches: ClientCaches,
318
319 /// Collection of locks individual client methods might want to use, either
320 /// to ensure that only a single call to a method happens at once or to
321 /// deduplicate multiple calls to a method.
322 pub(crate) locks: ClientLocks,
323
324 /// The cross-process lock configuration.
325 ///
326 /// The SDK provides cross-process store locks (see
327 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
328 /// [`CrossProcessLockConfig::MultiProcess`] is used.
329 ///
330 /// If multiple `Client`s are running in different processes, this
331 /// value MUST be different for each `Client`.
332 cross_process_lock_config: CrossProcessLockConfig,
333
334 /// A mapping of the times at which the current user sent typing notices,
335 /// keyed by room.
336 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
337
338 /// Event handlers. See `add_event_handler`.
339 pub(crate) event_handlers: EventHandlerStore,
340
341 /// Notification handlers. See `register_notification_handler`.
342 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
343
344 /// The sender-side of channels used to receive room updates.
345 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
346
347 /// The sender-side of a channel used to observe all the room updates of a
348 /// sync response.
349 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
350
351 /// Whether the client should update its homeserver URL with the discovery
352 /// information present in the login response.
353 respect_login_well_known: bool,
354
355 /// An event that can be listened on to wait for a successful sync. The
356 /// event will only be fired if a sync loop is running. Can be used for
357 /// synchronization, e.g. if we send out a request to create a room, we can
358 /// wait for the sync to get the data to fetch a room object from the state
359 /// store.
360 pub(crate) sync_beat: event_listener::Event,
361
362 /// A central cache for events, inactive first.
363 ///
364 /// It becomes active when [`EventCache::subscribe`] is called.
365 pub(crate) event_cache: OnceCell<EventCache>,
366
367 /// End-to-end encryption related state.
368 #[cfg(feature = "e2e-encryption")]
369 pub(crate) e2ee: EncryptionData,
370
371 /// The verification state of our own device.
372 #[cfg(feature = "e2e-encryption")]
373 pub(crate) verification_state: SharedObservable<VerificationState>,
374
375 /// Whether to enable the experimental support for sending and receiving
376 /// encrypted room history on invite, per [MSC4268].
377 ///
378 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
379 #[cfg(feature = "e2e-encryption")]
380 pub(crate) enable_share_history_on_invite: bool,
381
382 /// Data related to the [`SendQueue`].
383 ///
384 /// [`SendQueue`]: crate::send_queue::SendQueue
385 pub(crate) send_queue_data: Arc<SendQueueData>,
386
387 /// The `max_upload_size` value of the homeserver, it contains the max
388 /// request size you can send.
389 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
390
391 /// The entry point to get the [`LatestEvent`] of rooms and threads.
392 ///
393 /// [`LatestEvent`]: crate::latest_event::LatestEvent
394 latest_events: OnceCell<LatestEvents>,
395
396 /// Service handling the catching up of thread subscriptions in the
397 /// background.
398 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
399
400 #[cfg(feature = "experimental-search")]
401 /// Handler for [`RoomIndex`]'s of each room
402 search_index: SearchIndex,
403
404 /// A monitor for background tasks spawned by the client.
405 pub(crate) task_monitor: TaskMonitor,
406
407 /// A sender to notify subscribers about duplicate key upload errors
408 /// triggered by requests to /keys/upload.
409 #[cfg(feature = "e2e-encryption")]
410 pub(crate) duplicate_key_upload_error_sender:
411 broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
412
413 pub(crate) media_fetcher: Arc<dyn MediaFetcher>,
414}
415
416impl ClientInner {
417 /// Create a new `ClientInner`.
418 ///
419 /// All the fields passed as parameters here are those that must be cloned
420 /// upon instantiation of a sub-client, e.g. a client specialized for
421 /// notifications.
422 #[allow(clippy::too_many_arguments)]
423 async fn new(
424 auth_ctx: Arc<AuthCtx>,
425 server: Option<Url>,
426 homeserver: Url,
427 sliding_sync_version: SlidingSyncVersion,
428 http_client: HttpClient,
429 base_client: BaseClient,
430 supported_versions: CachedValue<TtlValue<SupportedVersions>>,
431 well_known: CachedValue<TtlValue<Option<WellKnownResponse>>>,
432 respect_login_well_known: bool,
433 event_cache: OnceCell<EventCache>,
434 send_queue: Arc<SendQueueData>,
435 latest_events: OnceCell<LatestEvents>,
436 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
437 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
438 cross_process_lock_config: CrossProcessLockConfig,
439 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
440 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
441 media_fetcher: Arc<dyn MediaFetcher>,
442 ) -> Arc<Self> {
443 let caches = ClientCaches {
444 supported_versions: Cache::with_value(supported_versions),
445 well_known: Cache::with_value(well_known),
446 server_metadata: Cache::new(),
447 homeserver_capabilities: Cache::new(),
448 };
449
450 let client = Self {
451 server,
452 homeserver: StdRwLock::new(homeserver),
453 auth_ctx,
454 sliding_sync_version: StdRwLock::new(sliding_sync_version),
455 http_client,
456 base_client,
457 caches,
458 locks: Default::default(),
459 cross_process_lock_config,
460 typing_notice_times: Default::default(),
461 event_handlers: Default::default(),
462 notification_handlers: Default::default(),
463 room_update_channels: Default::default(),
464 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
465 // ballast for all observers to catch up.
466 room_updates_sender: broadcast::Sender::new(32),
467 respect_login_well_known,
468 sync_beat: event_listener::Event::new(),
469 event_cache,
470 send_queue_data: send_queue,
471 latest_events,
472 #[cfg(feature = "e2e-encryption")]
473 e2ee: EncryptionData::new(encryption_settings),
474 #[cfg(feature = "e2e-encryption")]
475 verification_state: SharedObservable::new(VerificationState::Unknown),
476 #[cfg(feature = "e2e-encryption")]
477 enable_share_history_on_invite,
478 server_max_upload_size: Mutex::new(OnceCell::new()),
479 #[cfg(feature = "experimental-search")]
480 search_index: search_index_handler,
481 thread_subscription_catchup,
482 task_monitor: TaskMonitor::new(),
483 #[cfg(feature = "e2e-encryption")]
484 duplicate_key_upload_error_sender: broadcast::channel(1).0,
485 media_fetcher,
486 };
487
488 #[allow(clippy::let_and_return)]
489 let client = Arc::new(client);
490
491 #[cfg(feature = "e2e-encryption")]
492 client.e2ee.initialize_tasks(&client);
493
494 let init_event_cache = client.event_cache.get_or_init(|| async {
495 EventCache::new(&client, client.base_client.event_cache_store().clone())
496 });
497
498 let init_thread_subscription_catchup = client
499 .thread_subscription_catchup
500 .get_or_init(|| ThreadSubscriptionCatchup::new(Client { inner: client.clone() }));
501
502 let _ = join!(init_event_cache, init_thread_subscription_catchup);
503
504 client
505 }
506}
507
508#[cfg(not(tarpaulin_include))]
509impl Debug for Client {
510 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
511 write!(fmt, "Client")
512 }
513}
514
515impl Client {
516 /// Create a new [`Client`] that will use the given homeserver.
517 ///
518 /// # Arguments
519 ///
520 /// * `homeserver_url` - The homeserver that the client should connect to.
521 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
522 Self::builder().homeserver_url(homeserver_url).build().await
523 }
524
525 /// Returns a subscriber that publishes an event every time the ignore user
526 /// list changes.
527 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
528 self.inner.base_client.subscribe_to_ignore_user_list_changes()
529 }
530
531 /// Create a new [`ClientBuilder`].
532 pub fn builder() -> ClientBuilder {
533 ClientBuilder::new()
534 }
535
536 pub(crate) fn base_client(&self) -> &BaseClient {
537 &self.inner.base_client
538 }
539
540 /// The underlying HTTP client.
541 pub fn http_client(&self) -> &reqwest::Client {
542 &self.inner.http_client.inner
543 }
544
545 pub(crate) fn locks(&self) -> &ClientLocks {
546 &self.inner.locks
547 }
548
549 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
550 &self.inner.auth_ctx
551 }
552
553 /// The cross-process store lock configuration used by this [`Client`].
554 ///
555 /// The SDK provides cross-process store locks (see
556 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
557 /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
558 /// the value used for all cross-process store locks used by this
559 /// `Client`.
560 pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
561 &self.inner.cross_process_lock_config
562 }
563
564 /// Change the homeserver URL used by this client.
565 ///
566 /// # Arguments
567 ///
568 /// * `homeserver_url` - The new URL to use.
569 fn set_homeserver(&self, homeserver_url: Url) {
570 *self.inner.homeserver.write().unwrap() = homeserver_url;
571 }
572
573 /// Change to a different homeserver and re-resolve well-known.
574 #[cfg(feature = "e2e-encryption")]
575 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
576 &self,
577 homeserver_url: Url,
578 ) -> Result<()> {
579 self.set_homeserver(homeserver_url);
580 self.reset_well_known().await?;
581 if let Some(well_known) = self.well_known().await {
582 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
583 }
584 Ok(())
585 }
586
587 /// Retrieves a helper component to access the [`HomeserverCapabilities`]
588 /// supported or disabled by the homeserver.
589 pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
590 HomeserverCapabilities::new(self.clone())
591 }
592
593 /// Get the server vendor information from the federation API.
594 ///
595 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
596 /// both the server's software name and version.
597 ///
598 /// # Examples
599 ///
600 /// ```no_run
601 /// # use matrix_sdk::Client;
602 /// # use url::Url;
603 /// # async {
604 /// # let homeserver = Url::parse("http://example.com")?;
605 /// let client = Client::new(homeserver).await?;
606 ///
607 /// let server_info = client.server_vendor_info(None).await?;
608 /// println!(
609 /// "Server: {}, Version: {}",
610 /// server_info.server_name, server_info.version
611 /// );
612 /// # anyhow::Ok(()) };
613 /// ```
614 #[cfg(feature = "federation-api")]
615 pub async fn server_vendor_info(
616 &self,
617 request_config: Option<RequestConfig>,
618 ) -> HttpResult<ServerVendorInfo> {
619 use ruma::api::federation::discovery::get_server_version;
620
621 let res = self
622 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
623 .await?;
624
625 // Extract server info, using defaults if fields are missing.
626 let server = res.server.unwrap_or_default();
627 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
628 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
629
630 Ok(ServerVendorInfo { server_name: server_name_str, version })
631 }
632
633 /// Get a copy of the default request config.
634 ///
635 /// The default request config is what's used when sending requests if no
636 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
637 /// function with such a parameter.
638 ///
639 /// If the default request config was not customized through
640 /// [`ClientBuilder`] when creating this `Client`, the returned value will
641 /// be equivalent to [`RequestConfig::default()`].
642 pub fn request_config(&self) -> RequestConfig {
643 self.inner.http_client.request_config
644 }
645
646 /// Check whether the client has been activated.
647 ///
648 /// A client is considered active when:
649 ///
650 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
651 /// is logged in,
652 /// 2. Has loaded cached data from storage,
653 /// 3. If encryption is enabled, it also initialized or restored its
654 /// `OlmMachine`.
655 pub fn is_active(&self) -> bool {
656 self.inner.base_client.is_active()
657 }
658
659 /// The server used by the client.
660 ///
661 /// See `Self::server` to learn more.
662 pub fn server(&self) -> Option<&Url> {
663 self.inner.server.as_ref()
664 }
665
666 /// The homeserver of the client.
667 pub fn homeserver(&self) -> Url {
668 self.inner.homeserver.read().unwrap().clone()
669 }
670
671 /// Get the sliding sync version.
672 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
673 self.inner.sliding_sync_version.read().unwrap().clone()
674 }
675
676 /// Override the sliding sync version.
677 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
678 let mut lock = self.inner.sliding_sync_version.write().unwrap();
679 *lock = version;
680 }
681
682 /// Get the Matrix user session meta information.
683 ///
684 /// If the client is currently logged in, this will return a
685 /// [`SessionMeta`] object which contains the user ID and device ID.
686 /// Otherwise it returns `None`.
687 pub fn session_meta(&self) -> Option<&SessionMeta> {
688 self.base_client().session_meta()
689 }
690
691 /// Returns a receiver that gets events for each room info update. To watch
692 /// for new events, use `receiver.resubscribe()`.
693 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
694 self.base_client().room_info_notable_update_receiver()
695 }
696
697 /// Performs a search for users.
698 /// The search is performed case-insensitively on user IDs and display names
699 ///
700 /// # Arguments
701 ///
702 /// * `search_term` - The search term for the search
703 /// * `limit` - The maximum number of results to return. Defaults to 10.
704 ///
705 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
706 pub async fn search_users(
707 &self,
708 search_term: &str,
709 limit: u64,
710 ) -> HttpResult<search_users::v3::Response> {
711 let mut request = search_users::v3::Request::new(search_term.to_owned());
712
713 if let Some(limit) = UInt::new(limit) {
714 request.limit = limit;
715 }
716
717 self.send(request).await
718 }
719
720 /// Get the user id of the current owner of the client.
721 pub fn user_id(&self) -> Option<&UserId> {
722 self.session_meta().map(|s| s.user_id.as_ref())
723 }
724
725 /// Get the device ID that identifies the current session.
726 pub fn device_id(&self) -> Option<&DeviceId> {
727 self.session_meta().map(|s| s.device_id.as_ref())
728 }
729
730 /// Get the current access token for this session.
731 ///
732 /// Will be `None` if the client has not been logged in.
733 pub fn access_token(&self) -> Option<String> {
734 self.auth_ctx().access_token()
735 }
736
737 /// Get the current tokens for this session.
738 ///
739 /// To be notified of changes in the session tokens, use
740 /// [`Client::subscribe_to_session_changes()`] or
741 /// [`Client::set_session_callbacks()`].
742 ///
743 /// Returns `None` if the client has not been logged in.
744 pub fn session_tokens(&self) -> Option<SessionTokens> {
745 self.auth_ctx().session_tokens()
746 }
747
748 /// Access the authentication API used to log in this client.
749 ///
750 /// Will be `None` if the client has not been logged in.
751 pub fn auth_api(&self) -> Option<AuthApi> {
752 match self.auth_ctx().auth_data.get()? {
753 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
754 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
755 }
756 }
757
758 /// Get the whole session info of this client.
759 ///
760 /// Will be `None` if the client has not been logged in.
761 ///
762 /// Can be used with [`Client::restore_session`] to restore a previously
763 /// logged-in session.
764 pub fn session(&self) -> Option<AuthSession> {
765 match self.auth_api()? {
766 AuthApi::Matrix(api) => api.session().map(Into::into),
767 AuthApi::OAuth(api) => api.full_session().map(Into::into),
768 }
769 }
770
771 /// Get a reference to the state store.
772 pub fn state_store(&self) -> &DynStateStore {
773 self.base_client().state_store()
774 }
775
776 /// Get a reference to the event cache store.
777 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
778 self.base_client().event_cache_store()
779 }
780
781 /// Get a reference to the media store.
782 pub fn media_store(&self) -> &MediaStoreLock {
783 self.base_client().media_store()
784 }
785
786 /// Access the native Matrix authentication API with this client.
787 pub fn matrix_auth(&self) -> MatrixAuth {
788 MatrixAuth::new(self.clone())
789 }
790
791 /// Get the account of the current owner of the client.
792 pub fn account(&self) -> Account {
793 Account::new(self.clone())
794 }
795
796 /// Get the encryption manager of the client.
797 #[cfg(feature = "e2e-encryption")]
798 pub fn encryption(&self) -> Encryption {
799 Encryption::new(self.clone())
800 }
801
802 /// Get the media manager of the client.
803 pub fn media(&self) -> Media {
804 Media::new(self.clone(), self.inner.media_fetcher.clone())
805 }
806
807 /// Get the pusher manager of the client.
808 pub fn pusher(&self) -> Pusher {
809 Pusher::new(self.clone())
810 }
811
812 /// Access the OAuth 2.0 API of the client.
813 pub fn oauth(&self) -> OAuth {
814 OAuth::new(self.clone())
815 }
816
817 /// Register a handler for a specific event type.
818 ///
819 /// The handler is a function or closure with one or more arguments. The
820 /// first argument is the event itself. All additional arguments are
821 /// "context" arguments: They have to implement [`EventHandlerContext`].
822 /// This trait is named that way because most of the types implementing it
823 /// give additional context about an event: The room it was in, its raw form
824 /// and other similar things. As two exceptions to this,
825 /// [`Client`] and [`EventHandlerHandle`] also implement the
826 /// `EventHandlerContext` trait so you don't have to clone your client
827 /// into the event handler manually and a handler can decide to remove
828 /// itself.
829 ///
830 /// Some context arguments are not universally applicable. A context
831 /// argument that isn't available for the given event type will result in
832 /// the event handler being skipped and an error being logged. The following
833 /// context argument types are only available for a subset of event types:
834 ///
835 /// * [`Room`] is only available for room-specific events, i.e. not for
836 /// events like global account data events or presence events.
837 ///
838 /// You can provide custom context via
839 /// [`add_event_handler_context`](Client::add_event_handler_context) and
840 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
841 /// into the event handler.
842 ///
843 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
844 ///
845 /// # Examples
846 ///
847 /// ```no_run
848 /// use matrix_sdk::{
849 /// deserialized_responses::EncryptionInfo,
850 /// event_handler::Ctx,
851 /// ruma::{
852 /// events::{
853 /// macros::EventContent,
854 /// push_rules::PushRulesEvent,
855 /// room::{
856 /// message::SyncRoomMessageEvent,
857 /// topic::SyncRoomTopicEvent,
858 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
859 /// },
860 /// },
861 /// push::Action,
862 /// Int, MilliSecondsSinceUnixEpoch,
863 /// },
864 /// Client, Room,
865 /// };
866 /// use serde::{Deserialize, Serialize};
867 ///
868 /// # async fn example(client: Client) {
869 /// client.add_event_handler(
870 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
871 /// // Common usage: Room event plus room and client.
872 /// },
873 /// );
874 /// client.add_event_handler(
875 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
876 /// async move {
877 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
878 /// // unencrypted events and events that were decrypted by the SDK.
879 /// }
880 /// },
881 /// );
882 /// client.add_event_handler(
883 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
884 /// async move {
885 /// // A `Vec<Action>` parameter allows you to know which push actions
886 /// // are applicable for an event. For example, an event with
887 /// // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
888 /// // should be highlighted in the timeline.
889 /// }
890 /// },
891 /// );
892 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
893 /// // You can omit any or all arguments after the first.
894 /// });
895 ///
896 /// // Registering a temporary event handler:
897 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
898 /// /* Event handler */
899 /// });
900 /// client.remove_event_handler(handle);
901 ///
902 /// // Registering custom event handler context:
903 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
904 /// struct MyContext {
905 /// number: usize,
906 /// }
907 /// client.add_event_handler_context(MyContext { number: 5 });
908 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
909 /// // Use the context
910 /// });
911 ///
912 /// // This will handle membership events in joined rooms. Invites are special, see below.
913 /// client.add_event_handler(
914 /// |ev: SyncRoomMemberEvent| async move {},
915 /// );
916 ///
917 /// // To handle state events in invited rooms (including invite membership events),
918 /// // `StrippedRoomMemberEvent` should be used.
919 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
920 /// client.add_event_handler(
921 /// |ev: StrippedRoomMemberEvent| async move {},
922 /// );
923 ///
924 /// // Custom events work exactly the same way, you just need to declare
925 /// // the content struct and use the EventContent derive macro on it.
926 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
927 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
928 /// struct TokenEventContent {
929 /// token: String,
930 /// #[serde(rename = "exp")]
931 /// expires_at: MilliSecondsSinceUnixEpoch,
932 /// }
933 ///
934 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
935 /// todo!("Display the token");
936 /// });
937 ///
938 /// // Event handler closures can also capture local variables.
939 /// // Make sure they are cheap to clone though, because they will be cloned
940 /// // every time the closure is called.
941 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
942 ///
943 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
944 /// println!("Calling the handler with identifier {data}");
945 /// });
946 /// # }
947 /// ```
948 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
949 where
950 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
951 H: EventHandler<Ev, Ctx>,
952 {
953 self.add_event_handler_impl(handler, None)
954 }
955
956 /// Register a handler for a specific room, and event type.
957 ///
958 /// This method works the same way as
959 /// [`add_event_handler`][Self::add_event_handler], except that the handler
960 /// will only be called for events in the room with the specified ID. See
961 /// that method for more details on event handler functions.
962 ///
963 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
964 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
965 /// your use case.
966 pub fn add_room_event_handler<Ev, Ctx, H>(
967 &self,
968 room_id: &RoomId,
969 handler: H,
970 ) -> EventHandlerHandle
971 where
972 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
973 H: EventHandler<Ev, Ctx>,
974 {
975 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
976 }
977
978 /// Observe a specific event type.
979 ///
980 /// `Ev` represents the kind of event that will be observed. `Ctx`
981 /// represents the context that will come with the event. It relies on the
982 /// same mechanism as [`Client::add_event_handler`]. The main difference is
983 /// that it returns an [`ObservableEventHandler`] and doesn't require a
984 /// user-defined closure. It is possible to subscribe to the
985 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
986 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
987 /// Ctx)`.
988 ///
989 /// Be careful that only the most recent value can be observed. Subscribers
990 /// are notified when a new value is sent, but there is no guarantee
991 /// that they will see all values.
992 ///
993 /// # Example
994 ///
995 /// Let's see a classical usage:
996 ///
997 /// ```
998 /// use futures_util::StreamExt as _;
999 /// use matrix_sdk::{
1000 /// Client, Room,
1001 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
1002 /// };
1003 ///
1004 /// # async fn example(client: Client) -> Option<()> {
1005 /// let observer =
1006 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1007 ///
1008 /// let mut subscriber = observer.subscribe();
1009 ///
1010 /// let (event, (room, push_actions)) = subscriber.next().await?;
1011 /// # Some(())
1012 /// # }
1013 /// ```
1014 ///
1015 /// Now let's see how to get several contexts that can be useful for you:
1016 ///
1017 /// ```
1018 /// use matrix_sdk::{
1019 /// Client, Room,
1020 /// deserialized_responses::EncryptionInfo,
1021 /// ruma::{
1022 /// events::room::{
1023 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1024 /// },
1025 /// push::Action,
1026 /// },
1027 /// };
1028 ///
1029 /// # async fn example(client: Client) {
1030 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1031 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1032 ///
1033 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1034 /// // to distinguish between unencrypted events and events that were decrypted
1035 /// // by the SDK.
1036 /// let _ = client
1037 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1038 /// );
1039 ///
1040 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1041 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1042 /// // should be highlighted in the timeline.
1043 /// let _ =
1044 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1045 ///
1046 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1047 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1048 /// # }
1049 /// ```
1050 ///
1051 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1052 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1053 where
1054 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1055 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1056 {
1057 self.observe_room_events_impl(None)
1058 }
1059
1060 /// Observe a specific room, and event type.
1061 ///
1062 /// This method works the same way as [`Client::observe_events`], except
1063 /// that the observability will only be applied for events in the room with
1064 /// the specified ID. See that method for more details.
1065 ///
1066 /// Be careful that only the most recent value can be observed. Subscribers
1067 /// are notified when a new value is sent, but there is no guarantee
1068 /// that they will see all values.
1069 pub fn observe_room_events<Ev, Ctx>(
1070 &self,
1071 room_id: &RoomId,
1072 ) -> ObservableEventHandler<(Ev, Ctx)>
1073 where
1074 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1075 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1076 {
1077 self.observe_room_events_impl(Some(room_id.to_owned()))
1078 }
1079
1080 /// Shared implementation for `Client::observe_events` and
1081 /// `Client::observe_room_events`.
1082 fn observe_room_events_impl<Ev, Ctx>(
1083 &self,
1084 room_id: Option<OwnedRoomId>,
1085 ) -> ObservableEventHandler<(Ev, Ctx)>
1086 where
1087 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1088 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1089 {
1090 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1091 // new value.
1092 let shared_observable = SharedObservable::new(None);
1093
1094 ObservableEventHandler::new(
1095 shared_observable.clone(),
1096 self.event_handler_drop_guard(self.add_event_handler_impl(
1097 move |event: Ev, context: Ctx| {
1098 shared_observable.set(Some((event, context)));
1099
1100 ready(())
1101 },
1102 room_id,
1103 )),
1104 )
1105 }
1106
1107 /// Subscribe to future `beacon_info` updates for the current user across
1108 /// all rooms.
1109 ///
1110 /// This stream is push-only: it emits only future updates observed during
1111 /// sync processing and does not replay existing state.
1112 pub fn observe_own_beacon_info_updates(
1113 &self,
1114 ) -> Result<impl Stream<Item = BeaconInfoUpdate> + use<>> {
1115 let observer = self.observe_events::<OriginalSyncBeaconInfoEvent, Room>();
1116 let mut stream = observer.subscribe();
1117 let own_user_id = self.user_id().ok_or(Error::AuthenticationRequired)?.to_owned();
1118 Ok(async_stream::stream! {
1119 let _observer = observer;
1120
1121 while let Some((event, room)) = stream.next().await {
1122 if event.state_key != own_user_id {
1123 continue;
1124 }
1125 yield BeaconInfoUpdate {
1126 room_id: room.room_id().to_owned(),
1127 event_id: event.event_id,
1128 content: event.content,
1129 };
1130 }
1131 })
1132 }
1133
1134 /// Remove the event handler associated with the handle.
1135 ///
1136 /// Note that you **must not** call `remove_event_handler` from the
1137 /// non-async part of an event handler, that is:
1138 ///
1139 /// ```ignore
1140 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1141 /// // ⚠ this will cause a deadlock ⚠
1142 /// client.remove_event_handler(handle);
1143 ///
1144 /// async move {
1145 /// // removing the event handler here is fine
1146 /// client.remove_event_handler(handle);
1147 /// }
1148 /// })
1149 /// ```
1150 ///
1151 /// Note also that handlers that remove themselves will still execute with
1152 /// events received in the same sync cycle.
1153 ///
1154 /// # Arguments
1155 ///
1156 /// `handle` - The [`EventHandlerHandle`] that is returned when
1157 /// registering the event handler with [`Client::add_event_handler`].
1158 ///
1159 /// # Examples
1160 ///
1161 /// ```no_run
1162 /// # use url::Url;
1163 /// # use tokio::sync::mpsc;
1164 /// #
1165 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1166 /// #
1167 /// use matrix_sdk::{
1168 /// Client, event_handler::EventHandlerHandle,
1169 /// ruma::events::room::member::SyncRoomMemberEvent,
1170 /// };
1171 /// #
1172 /// # futures_executor::block_on(async {
1173 /// # let client = matrix_sdk::Client::builder()
1174 /// # .homeserver_url(homeserver)
1175 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1176 /// # .build()
1177 /// # .await
1178 /// # .unwrap();
1179 ///
1180 /// client.add_event_handler(
1181 /// |ev: SyncRoomMemberEvent,
1182 /// client: Client,
1183 /// handle: EventHandlerHandle| async move {
1184 /// // Common usage: Check arriving Event is the expected one
1185 /// println!("Expected RoomMemberEvent received!");
1186 /// client.remove_event_handler(handle);
1187 /// },
1188 /// );
1189 /// # });
1190 /// ```
1191 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1192 self.inner.event_handlers.remove(handle);
1193 }
1194
1195 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1196 /// the given handle.
1197 ///
1198 /// When the returned value is dropped, the event handler will be removed.
1199 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1200 EventHandlerDropGuard::new(handle, self.clone())
1201 }
1202
1203 /// Add an arbitrary value for use as event handler context.
1204 ///
1205 /// The value can be obtained in an event handler by adding an argument of
1206 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1207 ///
1208 /// If a value of the same type has been added before, it will be
1209 /// overwritten.
1210 ///
1211 /// # Examples
1212 ///
1213 /// ```no_run
1214 /// use matrix_sdk::{
1215 /// Room, event_handler::Ctx,
1216 /// ruma::events::room::message::SyncRoomMessageEvent,
1217 /// };
1218 /// # #[derive(Clone)]
1219 /// # struct SomeType;
1220 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1221 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1222 /// # futures_executor::block_on(async {
1223 /// # let client = matrix_sdk::Client::builder()
1224 /// # .homeserver_url(homeserver)
1225 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1226 /// # .build()
1227 /// # .await
1228 /// # .unwrap();
1229 ///
1230 /// // Handle used to send messages to the UI part of the app
1231 /// let my_gui_handle: SomeType = obtain_gui_handle();
1232 ///
1233 /// client.add_event_handler_context(my_gui_handle.clone());
1234 /// client.add_event_handler(
1235 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1236 /// async move {
1237 /// // gui_handle.send(DisplayMessage { message: ev });
1238 /// }
1239 /// },
1240 /// );
1241 /// # });
1242 /// ```
1243 pub fn add_event_handler_context<T>(&self, ctx: T)
1244 where
1245 T: Clone + Send + Sync + 'static,
1246 {
1247 self.inner.event_handlers.add_context(ctx);
1248 }
1249
1250 /// Register a handler for a notification.
1251 ///
1252 /// Similar to [`Client::add_event_handler`], but only allows functions
1253 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1254 /// [`Client`] for now.
1255 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1256 where
1257 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1258 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1259 {
1260 self.inner.notification_handlers.write().await.push(Box::new(
1261 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1262 ));
1263
1264 self
1265 }
1266
1267 /// Subscribe to all updates for the room with the given ID.
1268 ///
1269 /// The returned receiver will receive a new message for each sync response
1270 /// that contains updates for that room.
1271 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1272 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1273 btree_map::Entry::Vacant(entry) => {
1274 let (tx, rx) = broadcast::channel(8);
1275 entry.insert(tx);
1276 rx
1277 }
1278 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1279 }
1280 }
1281
1282 /// Subscribe to all updates to all rooms, whenever any has been received in
1283 /// a sync response.
1284 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1285 self.inner.room_updates_sender.subscribe()
1286 }
1287
1288 pub(crate) async fn notification_handlers(
1289 &self,
1290 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1291 self.inner.notification_handlers.read().await
1292 }
1293
1294 /// Get all the rooms the client knows about.
1295 ///
1296 /// This will return the list of joined, invited, and left rooms.
1297 pub fn rooms(&self) -> Vec<Room> {
1298 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1299 }
1300
1301 /// Get all the rooms the client knows about, filtered by room state.
1302 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1303 self.base_client()
1304 .rooms_filtered(filter)
1305 .into_iter()
1306 .map(|room| Room::new(self.clone(), room))
1307 .collect()
1308 }
1309
1310 /// Get a stream of all the rooms, in addition to the existing rooms.
1311 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1312 let (rooms, stream) = self.base_client().rooms_stream();
1313
1314 let map_room = |room| Room::new(self.clone(), room);
1315
1316 (
1317 rooms.into_iter().map(map_room).collect(),
1318 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1319 )
1320 }
1321
1322 /// Returns the joined rooms this client knows about.
1323 pub fn joined_rooms(&self) -> Vec<Room> {
1324 self.rooms_filtered(RoomStateFilter::JOINED)
1325 }
1326
1327 /// Returns the invited rooms this client knows about.
1328 pub fn invited_rooms(&self) -> Vec<Room> {
1329 self.rooms_filtered(RoomStateFilter::INVITED)
1330 }
1331
1332 /// Returns the left rooms this client knows about.
1333 pub fn left_rooms(&self) -> Vec<Room> {
1334 self.rooms_filtered(RoomStateFilter::LEFT)
1335 }
1336
1337 /// Returns the joined space rooms this client knows about.
1338 pub fn joined_space_rooms(&self) -> Vec<Room> {
1339 self.base_client()
1340 .rooms_filtered(RoomStateFilter::JOINED)
1341 .into_iter()
1342 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1343 .collect()
1344 }
1345
1346 /// Get a room with the given room id.
1347 ///
1348 /// # Arguments
1349 ///
1350 /// `room_id` - The unique id of the room that should be fetched.
1351 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1352 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1353 }
1354
1355 /// Gets the preview of a room, whether the current user has joined it or
1356 /// not.
1357 pub async fn get_room_preview(
1358 &self,
1359 room_or_alias_id: &RoomOrAliasId,
1360 via: Vec<OwnedServerName>,
1361 ) -> Result<RoomPreview> {
1362 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1363 Ok(room_id) => room_id.to_owned(),
1364 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1365 };
1366
1367 if let Some(room) = self.get_room(&room_id) {
1368 // The cached data can only be trusted if the room state is joined or
1369 // banned: for invite and knock rooms, no updates will be received
1370 // for the rooms after the invite/knock action took place so we may
1371 // have very out to date data for important fields such as
1372 // `join_rule`. For left rooms, the homeserver should return the latest info.
1373 match room.state() {
1374 RoomState::Joined | RoomState::Banned => {
1375 return Ok(RoomPreview::from_known_room(&room).await);
1376 }
1377 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1378 }
1379 }
1380
1381 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1382 }
1383
1384 /// Resolve a room alias to a room id and a list of servers which know
1385 /// about it.
1386 ///
1387 /// # Arguments
1388 ///
1389 /// `room_alias` - The room alias to be resolved.
1390 pub async fn resolve_room_alias(
1391 &self,
1392 room_alias: &RoomAliasId,
1393 ) -> HttpResult<get_alias::v3::Response> {
1394 let request = get_alias::v3::Request::new(room_alias.to_owned());
1395 self.send(request).await
1396 }
1397
1398 /// Checks if a room alias is not in use yet.
1399 ///
1400 /// Returns:
1401 /// - `Ok(true)` if the room alias is available.
1402 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1403 /// status code).
1404 /// - An `Err` otherwise.
1405 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1406 match self.resolve_room_alias(alias).await {
1407 // The room alias was resolved, so it's already in use.
1408 Ok(_) => Ok(false),
1409 Err(error) => {
1410 match error.client_api_error_kind() {
1411 // The room alias wasn't found, so it's available.
1412 Some(ErrorKind::NotFound) => Ok(true),
1413 _ => Err(error),
1414 }
1415 }
1416 }
1417 }
1418
1419 /// Adds a new room alias associated with a room to the room directory.
1420 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1421 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1422 self.send(request).await?;
1423 Ok(())
1424 }
1425
1426 /// Removes a room alias from the room directory.
1427 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1428 let request = delete_alias::v3::Request::new(alias.to_owned());
1429 self.send(request).await?;
1430 Ok(())
1431 }
1432
1433 /// Update the homeserver from the login response well-known if needed.
1434 ///
1435 /// # Arguments
1436 ///
1437 /// * `login_well_known` - The `well_known` field from a successful login
1438 /// response.
1439 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1440 if self.inner.respect_login_well_known
1441 && let Some(well_known) = login_well_known
1442 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1443 {
1444 self.set_homeserver(homeserver);
1445 }
1446 }
1447
1448 /// Similar to [`Client::restore_session_with`], with
1449 /// [`RoomLoadSettings::default()`].
1450 ///
1451 /// # Panics
1452 ///
1453 /// Panics if a session was already restored or logged in.
1454 #[instrument(skip_all)]
1455 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1456 self.restore_session_with(session, RoomLoadSettings::default()).await
1457 }
1458
1459 /// Restore a session previously logged-in using one of the available
1460 /// authentication APIs. The number of rooms to restore is controlled by
1461 /// [`RoomLoadSettings`].
1462 ///
1463 /// See the documentation of the corresponding authentication API's
1464 /// `restore_session` method for more information.
1465 ///
1466 /// # Panics
1467 ///
1468 /// Panics if a session was already restored or logged in.
1469 #[instrument(skip_all)]
1470 pub async fn restore_session_with(
1471 &self,
1472 session: impl Into<AuthSession>,
1473 room_load_settings: RoomLoadSettings,
1474 ) -> Result<()> {
1475 let session = session.into();
1476 match session {
1477 AuthSession::Matrix(session) => {
1478 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1479 }
1480 AuthSession::OAuth(session) => {
1481 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1482 }
1483 }
1484 }
1485
1486 /// Refresh the access token using the authentication API used to log into
1487 /// this session.
1488 ///
1489 /// See the documentation of the authentication API's `refresh_access_token`
1490 /// method for more information.
1491 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1492 let Some(auth_api) = self.auth_api() else {
1493 return Err(RefreshTokenError::RefreshTokenRequired);
1494 };
1495
1496 match auth_api {
1497 AuthApi::Matrix(api) => {
1498 trace!("Token refresh: Using the homeserver.");
1499 Box::pin(api.refresh_access_token()).await?;
1500 }
1501 AuthApi::OAuth(api) => {
1502 trace!("Token refresh: Using OAuth 2.0.");
1503 Box::pin(api.refresh_access_token()).await?;
1504 }
1505 }
1506
1507 Ok(())
1508 }
1509
1510 /// Log out the current session using the proper authentication API.
1511 ///
1512 /// # Errors
1513 ///
1514 /// Returns an error if the session is not authenticated or if an error
1515 /// occurred while making the request to the server.
1516 pub async fn logout(&self) -> Result<(), Error> {
1517 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1518 match auth_api {
1519 AuthApi::Matrix(matrix_auth) => {
1520 matrix_auth.logout().await?;
1521 Ok(())
1522 }
1523 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1524 }
1525 }
1526
1527 /// Get or upload a sync filter.
1528 ///
1529 /// This method will either get a filter ID from the store or upload the
1530 /// filter definition to the homeserver and return the new filter ID.
1531 ///
1532 /// # Arguments
1533 ///
1534 /// * `filter_name` - The unique name of the filter, this name will be used
1535 /// locally to store and identify the filter ID returned by the server.
1536 ///
1537 /// * `definition` - The filter definition that should be uploaded to the
1538 /// server if no filter ID can be found in the store.
1539 ///
1540 /// # Examples
1541 ///
1542 /// ```no_run
1543 /// # use matrix_sdk::{
1544 /// # Client, config::SyncSettings,
1545 /// # ruma::api::client::{
1546 /// # filter::{
1547 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1548 /// # },
1549 /// # sync::sync_events::v3::Filter,
1550 /// # }
1551 /// # };
1552 /// # use url::Url;
1553 /// # async {
1554 /// # let homeserver = Url::parse("http://example.com").unwrap();
1555 /// # let client = Client::new(homeserver).await.unwrap();
1556 /// let mut filter = FilterDefinition::default();
1557 ///
1558 /// // Let's enable member lazy loading.
1559 /// filter.room.state.lazy_load_options =
1560 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1561 ///
1562 /// let filter_id = client
1563 /// .get_or_upload_filter("sync", filter)
1564 /// .await
1565 /// .unwrap();
1566 ///
1567 /// let sync_settings = SyncSettings::new()
1568 /// .filter(Filter::FilterId(filter_id));
1569 ///
1570 /// let response = client.sync_once(sync_settings).await.unwrap();
1571 /// # };
1572 #[instrument(skip(self, definition))]
1573 pub async fn get_or_upload_filter(
1574 &self,
1575 filter_name: &str,
1576 definition: FilterDefinition,
1577 ) -> Result<String> {
1578 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1579 debug!("Found filter locally");
1580 Ok(filter)
1581 } else {
1582 debug!("Didn't find filter locally");
1583 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1584 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1585 let response = self.send(request).await?;
1586
1587 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1588
1589 Ok(response.filter_id)
1590 }
1591 }
1592
1593 /// Prepare to join a room by ID, by getting the current details about it
1594 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1595 let room = self.get_room(room_id)?;
1596
1597 let inviter = match room.invite_details().await {
1598 Ok(details) => details.inviter,
1599 Err(Error::WrongRoomState(_)) => None,
1600 Err(e) => {
1601 warn!("Error fetching invite details for room: {e:?}");
1602 None
1603 }
1604 };
1605
1606 Some(PreJoinRoomInfo { inviter })
1607 }
1608
1609 /// Finish joining a room.
1610 ///
1611 /// If the room was an invite that should be marked as a DM, will include it
1612 /// in the DM event after creating the joined room.
1613 ///
1614 /// If encrypted history sharing is enabled, will check to see if we have a
1615 /// key bundle, and import it if so.
1616 ///
1617 /// # Arguments
1618 ///
1619 /// * `room_id` - The `RoomId` of the room that was joined.
1620 /// * `pre_join_room_info` - Information about the room before we joined.
1621 async fn finish_join_room(
1622 &self,
1623 room_id: &RoomId,
1624 pre_join_room_info: Option<PreJoinRoomInfo>,
1625 ) -> Result<Room> {
1626 info!(?room_id, ?pre_join_room_info, "Completing room join");
1627 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1628 room.state() == RoomState::Invited
1629 && room.is_direct().await.unwrap_or_else(|e| {
1630 warn!(%room_id, "is_direct() failed: {e}");
1631 false
1632 })
1633 } else {
1634 false
1635 };
1636
1637 let base_room = self
1638 .base_client()
1639 .room_joined(
1640 room_id,
1641 pre_join_room_info
1642 .as_ref()
1643 .and_then(|info| info.inviter.as_ref())
1644 .map(|i| i.user_id().to_owned()),
1645 )
1646 .await?;
1647 let room = Room::new(self.clone(), base_room);
1648
1649 if mark_as_dm {
1650 room.set_is_direct(true).await?;
1651 }
1652
1653 // If we joined following an invite, check if we had previously received a key
1654 // bundle from the inviter, and import it if so.
1655 //
1656 // It's important that we only do this once `BaseClient::room_joined` has
1657 // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1658 // race.
1659 #[cfg(feature = "e2e-encryption")]
1660 if self.inner.enable_share_history_on_invite
1661 && let Some(inviter) =
1662 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1663 {
1664 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1665 .await?;
1666 }
1667
1668 // Suppress "unused variable" and "unused field" lints
1669 #[cfg(not(feature = "e2e-encryption"))]
1670 let _ = pre_join_room_info.map(|i| i.inviter);
1671
1672 Ok(room)
1673 }
1674
1675 /// Join a room by `RoomId`.
1676 ///
1677 /// Returns the `Room` in the joined state.
1678 ///
1679 /// # Arguments
1680 ///
1681 /// * `room_id` - The `RoomId` of the room to be joined.
1682 #[instrument(skip(self))]
1683 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1684 // See who invited us to this room, if anyone. Note we have to do this before
1685 // making the `/join` request, otherwise we could race against the sync.
1686 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1687
1688 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1689 let response = self.send(request).await?;
1690 self.finish_join_room(&response.room_id, pre_join_info).await
1691 }
1692
1693 /// Join a room by `RoomOrAliasId`.
1694 ///
1695 /// Returns the `Room` in the joined state.
1696 ///
1697 /// # Arguments
1698 ///
1699 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1700 /// alias looks like `#name:example.com`.
1701 /// * `server_names` - The server names to be used for resolving the alias,
1702 /// if needs be.
1703 #[instrument(skip(self))]
1704 pub async fn join_room_by_id_or_alias(
1705 &self,
1706 alias: &RoomOrAliasId,
1707 server_names: &[OwnedServerName],
1708 ) -> Result<Room> {
1709 let room_id = match <&RoomId>::try_from(alias) {
1710 Ok(room_id) => room_id,
1711 Err(room_alias) => &self.resolve_room_alias(room_alias).await?.room_id,
1712 };
1713 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1714 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1715 via: server_names.to_owned(),
1716 });
1717 let response = self.send(request).await?;
1718 self.finish_join_room(&response.room_id, pre_join_info).await
1719 }
1720
1721 /// Search the homeserver's directory of public rooms.
1722 ///
1723 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1724 /// a `get_public_rooms::Response`.
1725 ///
1726 /// # Arguments
1727 ///
1728 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1729 ///
1730 /// * `since` - Pagination token from a previous request.
1731 ///
1732 /// * `server` - The name of the server, if `None` the requested server is
1733 /// used.
1734 ///
1735 /// # Examples
1736 /// ```no_run
1737 /// use matrix_sdk::Client;
1738 /// # use url::Url;
1739 /// # let homeserver = Url::parse("http://example.com").unwrap();
1740 /// # let limit = Some(10);
1741 /// # let since = Some("since token");
1742 /// # let server = Some("servername.com".try_into().unwrap());
1743 /// # async {
1744 /// let mut client = Client::new(homeserver).await.unwrap();
1745 ///
1746 /// client.public_rooms(limit, since, server).await;
1747 /// # };
1748 /// ```
1749 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1750 pub async fn public_rooms(
1751 &self,
1752 limit: Option<u32>,
1753 since: Option<&str>,
1754 server: Option<&ServerName>,
1755 ) -> HttpResult<get_public_rooms::v3::Response> {
1756 let limit = limit.map(UInt::from);
1757
1758 let request = assign!(get_public_rooms::v3::Request::new(), {
1759 limit,
1760 since: since.map(ToOwned::to_owned),
1761 server: server.map(ToOwned::to_owned),
1762 });
1763 self.send(request).await
1764 }
1765
1766 /// Create a room with the given parameters.
1767 ///
1768 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1769 /// created room.
1770 ///
1771 /// If you want to create a direct message with one specific user, you can
1772 /// use [`create_dm`][Self::create_dm], which is more convenient than
1773 /// assembling the [`create_room::v3::Request`] yourself.
1774 ///
1775 /// If the `is_direct` field of the request is set to `true` and at least
1776 /// one user is invited, the room will be automatically added to the direct
1777 /// rooms in the account data.
1778 ///
1779 /// # Examples
1780 ///
1781 /// ```no_run
1782 /// use matrix_sdk::{
1783 /// Client,
1784 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1785 /// };
1786 /// # use url::Url;
1787 /// #
1788 /// # async {
1789 /// # let homeserver = Url::parse("http://example.com").unwrap();
1790 /// let request = CreateRoomRequest::new();
1791 /// let client = Client::new(homeserver).await.unwrap();
1792 /// assert!(client.create_room(request).await.is_ok());
1793 /// # };
1794 /// ```
1795 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1796 let invite = request.invite.clone();
1797 let is_direct_room = request.is_direct;
1798 let response = self.send(request).await?;
1799
1800 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1801
1802 let joined_room = Room::new(self.clone(), base_room);
1803
1804 if is_direct_room
1805 && !invite.is_empty()
1806 && let Err(error) =
1807 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1808 {
1809 // FIXME: Retry in the background
1810 error!("Failed to mark room as DM: {error}");
1811 }
1812
1813 Ok(joined_room)
1814 }
1815
1816 /// Create a DM room.
1817 ///
1818 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1819 /// given user being invited, the room marked `is_direct` and both the
1820 /// creator and invitee getting the default maximum power level.
1821 ///
1822 /// If the `e2e-encryption` feature is enabled, the room will also be
1823 /// encrypted.
1824 ///
1825 /// # Arguments
1826 ///
1827 /// * `user_id` - The ID of the user to create a DM for.
1828 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1829 #[cfg(feature = "e2e-encryption")]
1830 let initial_state = vec![
1831 InitialStateEvent::with_empty_state_key(
1832 RoomEncryptionEventContent::with_recommended_defaults(),
1833 )
1834 .to_raw_any(),
1835 ];
1836
1837 #[cfg(not(feature = "e2e-encryption"))]
1838 let initial_state = vec![];
1839
1840 let request = assign!(create_room::v3::Request::new(), {
1841 invite: vec![user_id.to_owned()],
1842 is_direct: true,
1843 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1844 initial_state,
1845 });
1846
1847 self.create_room(request).await
1848 }
1849
1850 /// Get the first existing DM room with the given user, if any.
1851 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1852 self.get_dm_rooms(user_id).next()
1853 }
1854
1855 /// Get an iterator with the existing DM rooms for the given user.
1856 pub fn get_dm_rooms(&self, user_id: &UserId) -> impl Iterator<Item = Room> {
1857 let rooms = self.joined_rooms();
1858
1859 let dm_definition = &self.base_client().dm_room_definition;
1860
1861 // Find the room we share with the `user_id` and only with `user_id`
1862 let rooms = rooms.into_iter().filter(move |r| {
1863 let targets = r.direct_targets();
1864 let targets_match =
1865 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id));
1866 match dm_definition {
1867 DmRoomDefinition::MatrixSpec => targets_match,
1868 DmRoomDefinition::TwoMembers => {
1869 let service_members_count =
1870 r.service_members().map(|s| s.len()).unwrap_or_default() as u64;
1871 let active_non_service_members =
1872 r.active_members_count().saturating_sub(service_members_count);
1873 targets_match && active_non_service_members <= 2
1874 }
1875 }
1876 });
1877
1878 trace!(?user_id, ?rooms, "Found DM rooms with user");
1879 rooms
1880 }
1881
1882 /// Search the homeserver's directory for public rooms with a filter.
1883 ///
1884 /// # Arguments
1885 ///
1886 /// * `room_search` - The easiest way to create this request is using the
1887 /// `get_public_rooms_filtered::Request` itself.
1888 ///
1889 /// # Examples
1890 ///
1891 /// ```no_run
1892 /// # use url::Url;
1893 /// # use matrix_sdk::Client;
1894 /// # async {
1895 /// # let homeserver = Url::parse("http://example.com")?;
1896 /// use matrix_sdk::ruma::{
1897 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1898 /// };
1899 /// # let mut client = Client::new(homeserver).await?;
1900 ///
1901 /// let mut filter = Filter::new();
1902 /// filter.generic_search_term = Some("rust".to_owned());
1903 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1904 /// request.filter = filter;
1905 ///
1906 /// let response = client.public_rooms_filtered(request).await?;
1907 ///
1908 /// for room in response.chunk {
1909 /// println!("Found room {room:?}");
1910 /// }
1911 /// # anyhow::Ok(()) };
1912 /// ```
1913 pub async fn public_rooms_filtered(
1914 &self,
1915 request: get_public_rooms_filtered::v3::Request,
1916 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1917 self.send(request).await
1918 }
1919
1920 /// Send an arbitrary request to the server, without updating client state.
1921 ///
1922 /// **Warning:** Because this method *does not* update the client state, it
1923 /// is important to make sure that you account for this yourself, and
1924 /// use wrapper methods where available. This method should *only* be
1925 /// used if a wrapper method for the endpoint you'd like to use is not
1926 /// available.
1927 ///
1928 /// # Arguments
1929 ///
1930 /// * `request` - A filled out and valid request for the endpoint to be hit
1931 ///
1932 /// * `timeout` - An optional request timeout setting, this overrides the
1933 /// default request setting if one was set.
1934 ///
1935 /// # Examples
1936 ///
1937 /// ```no_run
1938 /// # use matrix_sdk::{Client, config::SyncSettings};
1939 /// # use url::Url;
1940 /// # async {
1941 /// # let homeserver = Url::parse("http://localhost:8080")?;
1942 /// # let mut client = Client::new(homeserver).await?;
1943 /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1944 ///
1945 /// // First construct the request you want to make
1946 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1947 /// // for all available Endpoints
1948 /// let user_id = owned_user_id!("@example:localhost");
1949 /// let request = profile::get_profile::v3::Request::new(user_id);
1950 ///
1951 /// // Start the request using Client::send()
1952 /// let response = client.send(request).await?;
1953 ///
1954 /// // Check the corresponding Response struct to find out what types are
1955 /// // returned
1956 /// # anyhow::Ok(()) };
1957 /// ```
1958 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1959 where
1960 Request: OutgoingRequest + Clone + Debug,
1961 Request::Authentication: SupportedAuthScheme,
1962 Request::PathBuilder: SupportedPathBuilder,
1963 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1964 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1965 {
1966 SendRequest {
1967 client: self.clone(),
1968 request,
1969 config: None,
1970 send_progress: Default::default(),
1971 }
1972 }
1973
1974 pub(crate) async fn send_inner<Request>(
1975 &self,
1976 request: Request,
1977 config: Option<RequestConfig>,
1978 send_progress: SharedObservable<TransmissionProgress>,
1979 ) -> HttpResult<Request::IncomingResponse>
1980 where
1981 Request: OutgoingRequest + Debug,
1982 Request::Authentication: SupportedAuthScheme,
1983 Request::PathBuilder: SupportedPathBuilder,
1984 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1985 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1986 {
1987 let homeserver = self.homeserver().to_string();
1988 let access_token = self.access_token();
1989 let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1990
1991 let path_builder_input =
1992 Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1993
1994 let result = self
1995 .inner
1996 .http_client
1997 .send(
1998 request,
1999 config,
2000 homeserver,
2001 access_token.as_deref(),
2002 path_builder_input,
2003 send_progress,
2004 )
2005 .await;
2006
2007 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2008 result.as_ref().map_err(HttpError::client_api_error_kind)
2009 && let Some(access_token) = &access_token
2010 {
2011 // Mark the access token as expired.
2012 self.auth_ctx().set_access_token_expired(access_token);
2013 }
2014
2015 result
2016 }
2017
2018 fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
2019 _ = self
2020 .inner
2021 .auth_ctx
2022 .session_change_sender
2023 .send(SessionChange::UnknownToken(unknown_token_data.clone()));
2024 }
2025
2026 /// Fetches server versions from network; no caching.
2027 pub async fn fetch_server_versions(
2028 &self,
2029 request_config: Option<RequestConfig>,
2030 ) -> HttpResult<get_supported_versions::Response> {
2031 // Since this was called by the user, try to refresh the access token if
2032 // necessary.
2033 self.fetch_server_versions_inner(false, request_config).await
2034 }
2035
2036 /// Fetches server versions from network; no caching.
2037 ///
2038 /// If the access token is expired and `failsafe` is `false`, this will
2039 /// attempt to refresh the access token, otherwise this will try to make an
2040 /// unauthenticated request instead.
2041 pub(crate) async fn fetch_server_versions_inner(
2042 &self,
2043 failsafe: bool,
2044 request_config: Option<RequestConfig>,
2045 ) -> HttpResult<get_supported_versions::Response> {
2046 if !failsafe {
2047 // `Client::send()` handles refreshing access tokens.
2048 return self
2049 .send(get_supported_versions::Request::new())
2050 .with_request_config(request_config)
2051 .await;
2052 }
2053
2054 let homeserver = self.homeserver().to_string();
2055
2056 // If we have a fresh access token, try with it first.
2057 if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2058 && self.auth_ctx().has_valid_access_token()
2059 && let Some(access_token) = self.access_token()
2060 {
2061 let result = self
2062 .inner
2063 .http_client
2064 .send(
2065 get_supported_versions::Request::new(),
2066 request_config,
2067 homeserver.clone(),
2068 Some(&access_token),
2069 (),
2070 Default::default(),
2071 )
2072 .await;
2073
2074 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2075 result.as_ref().map_err(HttpError::client_api_error_kind)
2076 {
2077 // If the access token is actually expired, mark it as expired and fallback to
2078 // the unauthenticated request below.
2079 self.auth_ctx().set_access_token_expired(&access_token);
2080 } else {
2081 // If the request succeeded or it's an other error, just stop now.
2082 return result;
2083 }
2084 }
2085
2086 // Try without authentication.
2087 self.inner
2088 .http_client
2089 .send(
2090 get_supported_versions::Request::new(),
2091 request_config,
2092 homeserver.clone(),
2093 None,
2094 (),
2095 Default::default(),
2096 )
2097 .await
2098 }
2099
2100 /// Fetches client well_known from network; no caching.
2101 ///
2102 /// 1. If the [`Client::server`] value is available, we use it to fetch the
2103 /// well-known contents.
2104 /// 2. If it's not, we try extracting the server name from the
2105 /// [`Client::user_id`] and building the server URL from it.
2106 /// 3. If we couldn't get the well-known contents with either the explicit
2107 /// server name or the implicit extracted one, we try the homeserver URL
2108 /// as a last resort.
2109 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2110 let homeserver = self.homeserver();
2111 let scheme = homeserver.scheme();
2112
2113 // Use the server name, either an explicit one or an implicit one taken from
2114 // the user id: sometimes we'll have only the homeserver url available and no
2115 // server name, but the server name can be extracted from the current user id.
2116 let server_url = self
2117 .server()
2118 .map(|server| server.to_string())
2119 // If the server name wasn't available, extract it from the user id and build a URL:
2120 // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2121 // will be the same for the public server url, lacking a better candidate.
2122 .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2123
2124 // If the server name is available, first try using it
2125 let response = if let Some(server_url) = server_url {
2126 // First try using the server name
2127 self.fetch_client_well_known_with_url(server_url).await
2128 } else {
2129 None
2130 };
2131
2132 // If we didn't get a well-known value yet, try with the homeserver url instead:
2133 if response.is_none() {
2134 // Sometimes people configure their well-known directly on the homeserver so use
2135 // this as a fallback when the server name is unknown.
2136 warn!(
2137 "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2138 );
2139 self.fetch_client_well_known_with_url(homeserver.to_string()).await
2140 } else {
2141 response
2142 }
2143 }
2144
2145 async fn fetch_client_well_known_with_url(
2146 &self,
2147 url: String,
2148 ) -> Option<discover_homeserver::Response> {
2149 let well_known = self
2150 .inner
2151 .http_client
2152 .send(
2153 discover_homeserver::Request::new(),
2154 Some(RequestConfig::short_retry()),
2155 url,
2156 None,
2157 (),
2158 Default::default(),
2159 )
2160 .await;
2161
2162 match well_known {
2163 Ok(well_known) => Some(well_known),
2164 Err(http_error) => {
2165 // It is perfectly valid to not have a well-known file.
2166 // Maybe we should check for a specific error code to be sure?
2167 warn!("Failed to fetch client well-known: {http_error}");
2168 None
2169 }
2170 }
2171 }
2172
2173 /// Load supported versions from storage, or fetch them from network and
2174 /// cache them.
2175 ///
2176 /// If `failsafe` is true, this will try to minimize side effects to avoid
2177 /// possible deadlocks.
2178 async fn fetch_supported_versions(
2179 &self,
2180 failsafe: bool,
2181 ) -> HttpResult<SupportedVersionsResponse> {
2182 let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2183 let supported_versions = SupportedVersionsResponse {
2184 versions: server_versions.versions,
2185 unstable_features: server_versions.unstable_features,
2186 };
2187
2188 Ok(supported_versions)
2189 }
2190
2191 /// Get the Matrix versions and features supported by the homeserver by
2192 /// fetching them from the server or the cache.
2193 ///
2194 /// This is equivalent to calling both [`Client::server_versions()`] and
2195 /// [`Client::unstable_features()`]. To always fetch the result from the
2196 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2197 /// and then `.as_supported_versions()` on the response.
2198 ///
2199 /// # Examples
2200 ///
2201 /// ```no_run
2202 /// use ruma::api::{FeatureFlag, MatrixVersion};
2203 /// # use matrix_sdk::{Client, config::SyncSettings};
2204 /// # use url::Url;
2205 /// # async {
2206 /// # let homeserver = Url::parse("http://localhost:8080")?;
2207 /// # let mut client = Client::new(homeserver).await?;
2208 ///
2209 /// let supported = client.supported_versions().await?;
2210 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2211 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2212 ///
2213 /// let msc_x_feature = FeatureFlag::from("msc_x");
2214 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2215 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2216 /// # anyhow::Ok(()) };
2217 /// ```
2218 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2219 self.supported_versions_inner(false).await
2220 }
2221
2222 /// Get the Matrix versions and features supported by the homeserver by
2223 /// fetching them from the server or the cache.
2224 ///
2225 /// If `failsafe` is true, this will try to minimize side effects to avoid
2226 /// possible deadlocks.
2227 pub(crate) async fn supported_versions_inner(
2228 &self,
2229 failsafe: bool,
2230 ) -> HttpResult<SupportedVersions> {
2231 match self.supported_versions_cached_inner(failsafe).await {
2232 Ok(Some(value)) => {
2233 return Ok(value);
2234 }
2235 Ok(None) => {
2236 // The cache is empty, make a request.
2237 }
2238 Err(error) => {
2239 warn!("error when loading cached supported versions: {error}");
2240 // Fallthrough to make a request.
2241 }
2242 }
2243
2244 self.refresh_supported_versions_cache(failsafe).await
2245 }
2246
2247 /// Refresh the Matrix versions and features supported by the homeserver in
2248 /// the cache.
2249 ///
2250 /// If `failsafe` is true, this will try to minimize side effects to avoid
2251 /// possible deadlocks.
2252 async fn refresh_supported_versions_cache(
2253 &self,
2254 failsafe: bool,
2255 ) -> HttpResult<SupportedVersions> {
2256 let cached_supported_versions = &self.inner.caches.supported_versions;
2257
2258 let mut supported_versions_guard = match cached_supported_versions.refresh_lock.try_lock() {
2259 Ok(guard) => guard,
2260 Err(_) => {
2261 // There is already a refresh in progress, wait for it to finish.
2262 let guard = cached_supported_versions.refresh_lock.lock().await;
2263
2264 if let Err(error) = guard.as_ref() {
2265 // There was an error in the previous refresh, return it.
2266 return Err(HttpError::Cached(error.clone()));
2267 }
2268
2269 // Reuse the data if it was cached and it hasn't expired.
2270 if let CachedValue::Cached(value) = cached_supported_versions.value()
2271 && !value.has_expired()
2272 {
2273 return Ok(value.into_data());
2274 }
2275
2276 // The data wasn't cached or has expired, we need to make another request.
2277 guard
2278 }
2279 };
2280
2281 let response = match self.fetch_supported_versions(failsafe).await {
2282 Ok(response) => {
2283 *supported_versions_guard = Ok(());
2284 TtlValue::new(response)
2285 }
2286 Err(error) => {
2287 let error = Arc::new(error);
2288 *supported_versions_guard = Err(error.clone());
2289 return Err(HttpError::Cached(error));
2290 }
2291 };
2292
2293 let supported_versions = response.as_ref().map(|response| response.supported_versions());
2294
2295 // Only cache the result if the request was authenticated.
2296 if self.auth_ctx().has_valid_access_token() {
2297 if let Err(err) = self
2298 .state_store()
2299 .set_kv_data(
2300 StateStoreDataKey::SupportedVersions,
2301 StateStoreDataValue::SupportedVersions(response),
2302 )
2303 .await
2304 {
2305 warn!("error when caching supported versions: {err}");
2306 }
2307
2308 cached_supported_versions.set_value(supported_versions.clone());
2309 }
2310
2311 Ok(supported_versions.into_data())
2312 }
2313
2314 /// Get the Matrix versions and features supported by the homeserver by
2315 /// fetching them from the cache.
2316 ///
2317 /// For a version of this function that fetches the supported versions and
2318 /// features from the homeserver if the [`SupportedVersions`] aren't
2319 /// found in the cache, take a look at the [`Client::supported_versions()`]
2320 /// method.
2321 ///
2322 /// If the data in the cache has expired, this will trigger a background
2323 /// task to refresh it.
2324 ///
2325 /// # Examples
2326 ///
2327 /// ```no_run
2328 /// use ruma::api::{FeatureFlag, MatrixVersion};
2329 /// # use matrix_sdk::{Client, config::SyncSettings};
2330 /// # use url::Url;
2331 /// # async {
2332 /// # let homeserver = Url::parse("http://localhost:8080")?;
2333 /// # let mut client = Client::new(homeserver).await?;
2334 ///
2335 /// let supported =
2336 /// if let Some(supported) = client.supported_versions_cached().await? {
2337 /// supported
2338 /// } else {
2339 /// client.fetch_server_versions(None).await?.as_supported_versions()
2340 /// };
2341 ///
2342 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2343 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2344 ///
2345 /// let msc_x_feature = FeatureFlag::from("msc_x");
2346 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2347 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2348 /// # anyhow::Ok(()) };
2349 /// ```
2350 pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2351 self.supported_versions_cached_inner(false).await
2352 }
2353
2354 async fn supported_versions_cached_inner(
2355 &self,
2356 failsafe: bool,
2357 ) -> Result<Option<SupportedVersions>, StoreError> {
2358 let supported_versions_cache = &self.inner.caches.supported_versions;
2359
2360 let value = if let CachedValue::Cached(cached) = supported_versions_cache.value() {
2361 cached
2362 } else if let Some(stored) = self
2363 .state_store()
2364 .get_kv_data(StateStoreDataKey::SupportedVersions)
2365 .await?
2366 .and_then(|value| value.into_supported_versions())
2367 {
2368 let stored = stored.map(|response| response.supported_versions());
2369
2370 // Copy the data from the store in the in-memory cache.
2371 supported_versions_cache.set_value(stored.clone());
2372
2373 stored
2374 } else {
2375 return Ok(None);
2376 };
2377
2378 // Spawn a task to refresh the cache if it has expired and we have a valid
2379 // access token.
2380 if value.has_expired() && self.auth_ctx().has_valid_access_token() {
2381 debug!("spawning task to refresh supported versions cache");
2382
2383 let client = self.clone();
2384 self.task_monitor().spawn_finite_task("refresh supported versions cache", async move {
2385 if let Err(error) = client.refresh_supported_versions_cache(failsafe).await {
2386 warn!("failed to refresh supported versions cache: {error}");
2387 }
2388 });
2389 }
2390
2391 Ok(Some(value.into_data()))
2392 }
2393
2394 /// Get the Matrix versions supported by the homeserver by fetching them
2395 /// from the server or the cache.
2396 ///
2397 /// # Examples
2398 ///
2399 /// ```no_run
2400 /// use ruma::api::MatrixVersion;
2401 /// # use matrix_sdk::{Client, config::SyncSettings};
2402 /// # use url::Url;
2403 /// # async {
2404 /// # let homeserver = Url::parse("http://localhost:8080")?;
2405 /// # let mut client = Client::new(homeserver).await?;
2406 ///
2407 /// let server_versions = client.server_versions().await?;
2408 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2409 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2410 /// # anyhow::Ok(()) };
2411 /// ```
2412 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2413 Ok(self.supported_versions().await?.versions)
2414 }
2415
2416 /// Get the unstable features supported by the homeserver by fetching them
2417 /// from the server or the cache.
2418 ///
2419 /// # Examples
2420 ///
2421 /// ```no_run
2422 /// use matrix_sdk::ruma::api::FeatureFlag;
2423 /// # use matrix_sdk::{Client, config::SyncSettings};
2424 /// # use url::Url;
2425 /// # async {
2426 /// # let homeserver = Url::parse("http://localhost:8080")?;
2427 /// # let mut client = Client::new(homeserver).await?;
2428 ///
2429 /// let msc_x_feature = FeatureFlag::from("msc_x");
2430 /// let unstable_features = client.unstable_features().await?;
2431 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2432 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2433 /// # anyhow::Ok(()) };
2434 /// ```
2435 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2436 Ok(self.supported_versions().await?.features)
2437 }
2438
2439 /// Empty the supported versions and unstable features cache.
2440 ///
2441 /// Since the SDK caches the supported versions, it's possible to have a
2442 /// stale entry in the cache. This functions makes it possible to force
2443 /// reset it.
2444 pub async fn reset_supported_versions(&self) -> Result<()> {
2445 // Empty the in-memory cache.
2446 self.inner.caches.supported_versions.reset();
2447
2448 // Empty the store cache.
2449 Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2450 }
2451
2452 /// Get the well-known file of the homeserver from the cache.
2453 ///
2454 /// If the data in the cache has expired, this will trigger a background
2455 /// task to refresh it.
2456 async fn well_known_cached(
2457 &self,
2458 ) -> Result<CachedValue<Option<WellKnownResponse>>, StoreError> {
2459 let well_known_cache = &self.inner.caches.well_known;
2460
2461 let value = if let CachedValue::Cached(cached) = well_known_cache.value() {
2462 cached
2463 } else if let Some(stored) = self
2464 .state_store()
2465 .get_kv_data(StateStoreDataKey::WellKnown)
2466 .await?
2467 .and_then(|value| value.into_well_known())
2468 {
2469 // Copy the data from the store into the in-memory cache.
2470 well_known_cache.set_value(stored.clone());
2471
2472 stored
2473 } else {
2474 return Ok(CachedValue::NotSet);
2475 };
2476
2477 // Spawn a task to refresh the cache if it has expired.
2478 if value.has_expired() {
2479 debug!("spawning task to refresh well-known cache");
2480
2481 let client = self.clone();
2482 self.task_monitor().spawn_finite_task("refresh well-known cache", async move {
2483 client.refresh_well_known_cache().await;
2484 });
2485 }
2486
2487 Ok(CachedValue::Cached(value.into_data()))
2488 }
2489
2490 /// Refresh the well-known file of the homeserver in the cache.
2491 async fn refresh_well_known_cache(&self) -> Option<WellKnownResponse> {
2492 let well_known_cache = &self.inner.caches.well_known;
2493
2494 let _well_known_guard = match well_known_cache.refresh_lock.try_lock() {
2495 Ok(guard) => guard,
2496 Err(_) => {
2497 // There is already a refresh in progress, wait for it to finish.
2498 let guard = well_known_cache.refresh_lock.lock().await;
2499
2500 // A refresh can't fail because we ignore failures, so there shouldn't be an
2501 // error in the refresh lock.
2502
2503 // Reuse the data if it was cached and it hasn't expired.
2504 if let CachedValue::Cached(value) = well_known_cache.value()
2505 && !value.has_expired()
2506 {
2507 return value.into_data();
2508 }
2509
2510 // The data wasn't cached or has expired, we need to make another request.
2511 guard
2512 }
2513 };
2514
2515 let well_known = TtlValue::new(self.fetch_client_well_known().await.map(Into::into));
2516
2517 if let Err(err) = self
2518 .state_store()
2519 .set_kv_data(
2520 StateStoreDataKey::WellKnown,
2521 StateStoreDataValue::WellKnown(well_known.clone()),
2522 )
2523 .await
2524 {
2525 warn!("error when caching well-known: {err}");
2526 }
2527
2528 well_known_cache.set_value(well_known.clone());
2529
2530 well_known.into_data()
2531 }
2532
2533 /// Get the well-known file of the homeserver by fetching it from the server
2534 /// or the cache.
2535 async fn well_known(&self) -> Option<WellKnownResponse> {
2536 match self.well_known_cached().await {
2537 Ok(CachedValue::Cached(value)) => {
2538 return value;
2539 }
2540 Ok(CachedValue::NotSet) => {
2541 // The cache is empty, make a request.
2542 }
2543 Err(error) => {
2544 warn!("error when loading cached well-known: {error}");
2545 // Fallthrough to make a request.
2546 }
2547 }
2548
2549 self.refresh_well_known_cache().await
2550 }
2551
2552 /// Get information about the homeserver's advertised RTC foci by fetching
2553 /// the well-known file from the server or the cache.
2554 ///
2555 /// # Examples
2556 /// ```no_run
2557 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::rtc::RtcTransport};
2558 /// # use url::Url;
2559 /// # async {
2560 /// # let homeserver = Url::parse("http://localhost:8080")?;
2561 /// # let mut client = Client::new(homeserver).await?;
2562 /// let rtc_foci = client.rtc_foci().await?;
2563 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2564 /// RtcTransport::LiveKit(info) => Some(info),
2565 /// _ => None,
2566 /// });
2567 /// if let Some(info) = default_livekit_focus_info {
2568 /// println!("Default LiveKit service URL: {}", info.service_url);
2569 /// }
2570 /// # anyhow::Ok(()) };
2571 /// ```
2572 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcTransport>> {
2573 let well_known = self.well_known().await;
2574
2575 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2576 }
2577
2578 /// Get information about the homeserver's advertised map tile server, if
2579 /// any, by fetching the well-known file from the server or the cache.
2580 ///
2581 /// Returns `None` if the homeserver has not advertised a tile server in its
2582 /// well-known, or if the well-known is otherwise unavailable.
2583 pub async fn tile_server(&self) -> Option<TileServerInfo> {
2584 self.well_known().await.and_then(|well_known| well_known.tile_server).map(Into::into)
2585 }
2586
2587 /// Empty the well-known cache.
2588 ///
2589 /// Since the SDK caches the well-known, it's possible to have a stale entry
2590 /// in the cache. This functions makes it possible to force reset it.
2591 pub async fn reset_well_known(&self) -> Result<()> {
2592 // Empty the in-memory caches.
2593 self.inner.caches.well_known.reset();
2594
2595 // Empty the store cache.
2596 Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2597 }
2598
2599 /// Check whether MSC 4028 is enabled on the homeserver.
2600 ///
2601 /// # Examples
2602 ///
2603 /// ```no_run
2604 /// # use matrix_sdk::{Client, config::SyncSettings};
2605 /// # use url::Url;
2606 /// # async {
2607 /// # let homeserver = Url::parse("http://localhost:8080")?;
2608 /// # let mut client = Client::new(homeserver).await?;
2609 /// let msc4028_enabled =
2610 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2611 /// # anyhow::Ok(()) };
2612 /// ```
2613 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2614 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2615 }
2616
2617 /// Get information of all our own devices.
2618 ///
2619 /// # Examples
2620 ///
2621 /// ```no_run
2622 /// # use matrix_sdk::{Client, config::SyncSettings};
2623 /// # use url::Url;
2624 /// # async {
2625 /// # let homeserver = Url::parse("http://localhost:8080")?;
2626 /// # let mut client = Client::new(homeserver).await?;
2627 /// let response = client.devices().await?;
2628 ///
2629 /// for device in response.devices {
2630 /// println!(
2631 /// "Device: {} {}",
2632 /// device.device_id,
2633 /// device.display_name.as_deref().unwrap_or("")
2634 /// );
2635 /// }
2636 /// # anyhow::Ok(()) };
2637 /// ```
2638 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2639 let request = get_devices::v3::Request::new();
2640
2641 self.send(request).await
2642 }
2643
2644 /// Delete the given devices from the server.
2645 ///
2646 /// # Arguments
2647 ///
2648 /// * `devices` - The list of devices that should be deleted from the
2649 /// server.
2650 ///
2651 /// * `auth_data` - This request requires user interactive auth, the first
2652 /// request needs to set this to `None` and will always fail with an
2653 /// `UiaaResponse`. The response will contain information for the
2654 /// interactive auth and the same request needs to be made but this time
2655 /// with some `auth_data` provided.
2656 ///
2657 /// ```no_run
2658 /// # use matrix_sdk::{
2659 /// # ruma::{api::client::uiaa, owned_device_id},
2660 /// # Client, Error, config::SyncSettings,
2661 /// # };
2662 /// # use serde_json::json;
2663 /// # use url::Url;
2664 /// # use std::collections::BTreeMap;
2665 /// # async {
2666 /// # let homeserver = Url::parse("http://localhost:8080")?;
2667 /// # let mut client = Client::new(homeserver).await?;
2668 /// let devices = &[owned_device_id!("DEVICEID")];
2669 ///
2670 /// if let Err(e) = client.delete_devices(devices, None).await {
2671 /// if let Some(info) = e.as_uiaa_response() {
2672 /// let mut password = uiaa::Password::new(
2673 /// uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2674 /// "wordpass".to_owned(),
2675 /// );
2676 /// password.session = info.session.clone();
2677 ///
2678 /// client
2679 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2680 /// .await?;
2681 /// }
2682 /// }
2683 /// # anyhow::Ok(()) };
2684 pub async fn delete_devices(
2685 &self,
2686 devices: &[OwnedDeviceId],
2687 auth_data: Option<uiaa::AuthData>,
2688 ) -> HttpResult<delete_devices::v3::Response> {
2689 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2690 request.auth = auth_data;
2691
2692 self.send(request).await
2693 }
2694
2695 /// Change the display name of a device owned by the current user.
2696 ///
2697 /// Returns a `update_device::Response` which specifies the result
2698 /// of the operation.
2699 ///
2700 /// # Arguments
2701 ///
2702 /// * `device_id` - The ID of the device to change the display name of.
2703 /// * `display_name` - The new display name to set.
2704 pub async fn rename_device(
2705 &self,
2706 device_id: &DeviceId,
2707 display_name: &str,
2708 ) -> HttpResult<update_device::v3::Response> {
2709 let mut request = update_device::v3::Request::new(device_id.to_owned());
2710 request.display_name = Some(display_name.to_owned());
2711
2712 self.send(request).await
2713 }
2714
2715 /// Check whether a device with a specific ID exists on the server.
2716 ///
2717 /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2718 /// with 404 and the underlying error otherwise.
2719 ///
2720 /// # Arguments
2721 ///
2722 /// * `device_id` - The ID of the device to query.
2723 pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2724 let request = device::get_device::v3::Request::new(device_id);
2725 match self.send(request).await {
2726 Ok(_) => Ok(true),
2727 Err(err) => {
2728 if let Some(error) = err.as_client_api_error()
2729 && error.status_code == 404
2730 {
2731 Ok(false)
2732 } else {
2733 Err(err.into())
2734 }
2735 }
2736 }
2737 }
2738
2739 /// Synchronize the client's state with the latest state on the server.
2740 ///
2741 /// ## Syncing Events
2742 ///
2743 /// Messages or any other type of event need to be periodically fetched from
2744 /// the server, this is achieved by sending a `/sync` request to the server.
2745 ///
2746 /// The first sync is sent out without a [`token`]. The response of the
2747 /// first sync will contain a [`next_batch`] field which should then be
2748 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2749 /// don't receive the same events multiple times.
2750 ///
2751 /// ## Long Polling
2752 ///
2753 /// A sync should in the usual case always be in flight. The
2754 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2755 /// long the server will wait for new events before it will respond.
2756 /// The server will respond immediately if some new events arrive before the
2757 /// timeout has expired. If no changes arrive and the timeout expires an
2758 /// empty sync response will be sent to the client.
2759 ///
2760 /// This method of sending a request that may not receive a response
2761 /// immediately is called long polling.
2762 ///
2763 /// ## Filtering Events
2764 ///
2765 /// The number or type of messages and events that the client should receive
2766 /// from the server can be altered using a [`Filter`].
2767 ///
2768 /// Filters can be non-trivial and, since they will be sent with every sync
2769 /// request, they may take up a bunch of unnecessary bandwidth.
2770 ///
2771 /// Luckily filters can be uploaded to the server and reused using an unique
2772 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2773 /// method.
2774 ///
2775 /// # Arguments
2776 ///
2777 /// * `sync_settings` - Settings for the sync call, this allows us to set
2778 /// various options to configure the sync:
2779 /// * [`filter`] - To configure which events we receive and which get
2780 /// [filtered] by the server
2781 /// * [`timeout`] - To configure our [long polling] setup.
2782 /// * [`token`] - To tell the server which events we already received
2783 /// and where we wish to continue syncing.
2784 /// * [`full_state`] - To tell the server that we wish to receive all
2785 /// state events, regardless of our configured [`token`].
2786 /// * [`set_presence`] - To tell the server to set the presence and to
2787 /// which state.
2788 ///
2789 /// # Examples
2790 ///
2791 /// ```no_run
2792 /// # use url::Url;
2793 /// # async {
2794 /// # let homeserver = Url::parse("http://localhost:8080")?;
2795 /// # let username = "";
2796 /// # let password = "";
2797 /// use matrix_sdk::{
2798 /// Client, config::SyncSettings,
2799 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2800 /// };
2801 ///
2802 /// let client = Client::new(homeserver).await?;
2803 /// client.matrix_auth().login_username(username, password).send().await?;
2804 ///
2805 /// // Sync once so we receive the client state and old messages.
2806 /// client.sync_once(SyncSettings::default()).await?;
2807 ///
2808 /// // Register our handler so we start responding once we receive a new
2809 /// // event.
2810 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2811 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2812 /// });
2813 ///
2814 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2815 /// // from our `sync_once()` call automatically.
2816 /// client.sync(SyncSettings::default()).await;
2817 /// # anyhow::Ok(()) };
2818 /// ```
2819 ///
2820 /// [`sync`]: #method.sync
2821 /// [`SyncSettings`]: crate::config::SyncSettings
2822 /// [`token`]: crate::config::SyncSettings#method.token
2823 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2824 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2825 /// [`set_presence`]: ruma::presence::PresenceState
2826 /// [`filter`]: crate::config::SyncSettings#method.filter
2827 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2828 /// [`next_batch`]: SyncResponse#structfield.next_batch
2829 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2830 /// [long polling]: #long-polling
2831 /// [filtered]: #filtering-events
2832 #[instrument(skip(self))]
2833 pub async fn sync_once(
2834 &self,
2835 sync_settings: crate::config::SyncSettings,
2836 ) -> Result<SyncResponse> {
2837 // The sync might not return for quite a while due to the timeout.
2838 // We'll see if there's anything crypto related to send out before we
2839 // sync, i.e. if we closed our client after a sync but before the
2840 // crypto requests were sent out.
2841 //
2842 // This will mostly be a no-op.
2843 #[cfg(feature = "e2e-encryption")]
2844 if let Err(e) = self.send_outgoing_requests().await {
2845 error!(error = ?e, "Error while sending outgoing E2EE requests");
2846 }
2847
2848 let token = match sync_settings.token {
2849 SyncToken::Specific(token) => Some(token),
2850 SyncToken::NoToken => None,
2851 SyncToken::ReusePrevious => self.sync_token().await,
2852 };
2853
2854 let request = assign!(sync_events::v3::Request::new(), {
2855 filter: sync_settings.filter.map(|f| *f),
2856 since: token,
2857 full_state: sync_settings.full_state,
2858 set_presence: sync_settings.set_presence,
2859 timeout: sync_settings.timeout,
2860 use_state_after: true,
2861 });
2862 let mut request_config = self.request_config();
2863 if let Some(timeout) = sync_settings.timeout {
2864 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2865 request_config.timeout = Some(base_timeout + timeout);
2866 }
2867
2868 let response = self.send(request).with_request_config(request_config).await?;
2869 let next_batch = response.next_batch.clone();
2870 let response = self.process_sync(response).await?;
2871
2872 #[cfg(feature = "e2e-encryption")]
2873 if let Err(e) = self.send_outgoing_requests().await {
2874 error!(error = ?e, "Error while sending outgoing E2EE requests");
2875 }
2876
2877 self.inner.sync_beat.notify(usize::MAX);
2878
2879 Ok(SyncResponse::new(next_batch, response))
2880 }
2881
2882 /// Repeatedly synchronize the client state with the server.
2883 ///
2884 /// This method will only return on error, if cancellation is needed
2885 /// the method should be wrapped in a cancelable task or the
2886 /// [`Client::sync_with_callback`] method can be used or
2887 /// [`Client::sync_with_result_callback`] if you want to handle error
2888 /// cases in the loop, too.
2889 ///
2890 /// This method will internally call [`Client::sync_once`] in a loop.
2891 ///
2892 /// This method can be used with the [`Client::add_event_handler`]
2893 /// method to react to individual events. If you instead wish to handle
2894 /// events in a bulk manner the [`Client::sync_with_callback`],
2895 /// [`Client::sync_with_result_callback`] and
2896 /// [`Client::sync_stream`] methods can be used instead. Those methods
2897 /// repeatedly return the whole sync response.
2898 ///
2899 /// # Arguments
2900 ///
2901 /// * `sync_settings` - Settings for the sync call. *Note* that those
2902 /// settings will be only used for the first sync call. See the argument
2903 /// docs for [`Client::sync_once`] for more info.
2904 ///
2905 /// # Return
2906 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2907 /// up to the user of the API to check the error and decide whether the sync
2908 /// should continue or not.
2909 ///
2910 /// # Examples
2911 ///
2912 /// ```no_run
2913 /// # use url::Url;
2914 /// # async {
2915 /// # let homeserver = Url::parse("http://localhost:8080")?;
2916 /// # let username = "";
2917 /// # let password = "";
2918 /// use matrix_sdk::{
2919 /// Client, config::SyncSettings,
2920 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2921 /// };
2922 ///
2923 /// let client = Client::new(homeserver).await?;
2924 /// client.matrix_auth().login_username(&username, &password).send().await?;
2925 ///
2926 /// // Register our handler so we start responding once we receive a new
2927 /// // event.
2928 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2929 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2930 /// });
2931 ///
2932 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2933 /// // automatically.
2934 /// client.sync(SyncSettings::default()).await?;
2935 /// # anyhow::Ok(()) };
2936 /// ```
2937 ///
2938 /// [argument docs]: #method.sync_once
2939 /// [`sync_with_callback`]: #method.sync_with_callback
2940 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2941 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2942 }
2943
2944 /// Repeatedly call sync to synchronize the client state with the server.
2945 ///
2946 /// # Arguments
2947 ///
2948 /// * `sync_settings` - Settings for the sync call. *Note* that those
2949 /// settings will be only used for the first sync call. See the argument
2950 /// docs for [`Client::sync_once`] for more info.
2951 ///
2952 /// * `callback` - A callback that will be called every time a successful
2953 /// response has been fetched from the server. The callback must return a
2954 /// boolean which signalizes if the method should stop syncing. If the
2955 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2956 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2957 ///
2958 /// # Return
2959 /// The sync runs until an error occurs or the
2960 /// callback indicates that the Loop should stop. If the callback asked for
2961 /// a regular stop, the result will be `Ok(())` otherwise the
2962 /// `Err(Error)` is returned.
2963 ///
2964 /// # Examples
2965 ///
2966 /// The following example demonstrates how to sync forever while sending all
2967 /// the interesting events through a mpsc channel to another thread e.g. a
2968 /// UI thread.
2969 ///
2970 /// ```no_run
2971 /// # use std::time::Duration;
2972 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2973 /// # use url::Url;
2974 /// # async {
2975 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2976 /// # let mut client = Client::new(homeserver).await.unwrap();
2977 ///
2978 /// use tokio::sync::mpsc::channel;
2979 ///
2980 /// let (tx, rx) = channel(100);
2981 ///
2982 /// let sync_channel = &tx;
2983 /// let sync_settings = SyncSettings::new()
2984 /// .timeout(Duration::from_secs(30));
2985 ///
2986 /// client
2987 /// .sync_with_callback(sync_settings, |response| async move {
2988 /// let channel = sync_channel;
2989 /// for (room_id, room) in response.rooms.joined {
2990 /// for event in room.timeline.events {
2991 /// channel.send(event).await.unwrap();
2992 /// }
2993 /// }
2994 ///
2995 /// LoopCtrl::Continue
2996 /// })
2997 /// .await;
2998 /// };
2999 /// ```
3000 #[instrument(skip_all)]
3001 pub async fn sync_with_callback<C>(
3002 &self,
3003 sync_settings: crate::config::SyncSettings,
3004 callback: impl Fn(SyncResponse) -> C,
3005 ) -> Result<(), Error>
3006 where
3007 C: Future<Output = LoopCtrl>,
3008 {
3009 self.sync_with_result_callback(sync_settings, |result| async {
3010 Ok(callback(result?).await)
3011 })
3012 .await
3013 }
3014
3015 /// Repeatedly call sync to synchronize the client state with the server.
3016 ///
3017 /// # Arguments
3018 ///
3019 /// * `sync_settings` - Settings for the sync call. *Note* that those
3020 /// settings will be only used for the first sync call. See the argument
3021 /// docs for [`Client::sync_once`] for more info.
3022 ///
3023 /// * `callback` - A callback that will be called every time after a
3024 /// response has been received, failure or not. The callback returns a
3025 /// `Result<LoopCtrl, Error>`, too. When returning
3026 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
3027 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
3028 /// function returns `Ok(())`. In case the callback can't handle the
3029 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
3030 /// which results in the sync ending and the `Err(Error)` being returned.
3031 ///
3032 /// # Return
3033 /// The sync runs until an error occurs that the callback can't handle or
3034 /// the callback indicates that the Loop should stop. If the callback
3035 /// asked for a regular stop, the result will be `Ok(())` otherwise the
3036 /// `Err(Error)` is returned.
3037 ///
3038 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
3039 /// this, and are handled first without sending the result to the
3040 /// callback. Only after they have exceeded is the `Result` handed to
3041 /// the callback.
3042 ///
3043 /// # Examples
3044 ///
3045 /// The following example demonstrates how to sync forever while sending all
3046 /// the interesting events through a mpsc channel to another thread e.g. a
3047 /// UI thread.
3048 ///
3049 /// ```no_run
3050 /// # use std::time::Duration;
3051 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
3052 /// # use url::Url;
3053 /// # async {
3054 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
3055 /// # let mut client = Client::new(homeserver).await.unwrap();
3056 /// #
3057 /// use tokio::sync::mpsc::channel;
3058 ///
3059 /// let (tx, rx) = channel(100);
3060 ///
3061 /// let sync_channel = &tx;
3062 /// let sync_settings = SyncSettings::new()
3063 /// .timeout(Duration::from_secs(30));
3064 ///
3065 /// client
3066 /// .sync_with_result_callback(sync_settings, |response| async move {
3067 /// let channel = sync_channel;
3068 /// let sync_response = response?;
3069 /// for (room_id, room) in sync_response.rooms.joined {
3070 /// for event in room.timeline.events {
3071 /// channel.send(event).await.unwrap();
3072 /// }
3073 /// }
3074 ///
3075 /// Ok(LoopCtrl::Continue)
3076 /// })
3077 /// .await;
3078 /// };
3079 /// ```
3080 #[instrument(skip(self, callback))]
3081 pub async fn sync_with_result_callback<C>(
3082 &self,
3083 sync_settings: crate::config::SyncSettings,
3084 callback: impl Fn(Result<SyncResponse, Error>) -> C,
3085 ) -> Result<(), Error>
3086 where
3087 C: Future<Output = Result<LoopCtrl, Error>>,
3088 {
3089 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
3090
3091 while let Some(result) = sync_stream.next().await {
3092 trace!("Running callback");
3093 if callback(result).await? == LoopCtrl::Break {
3094 trace!("Callback told us to stop");
3095 break;
3096 }
3097 trace!("Done running callback");
3098 }
3099
3100 Ok(())
3101 }
3102
3103 //// Repeatedly synchronize the client state with the server.
3104 ///
3105 /// This method will internally call [`Client::sync_once`] in a loop and is
3106 /// equivalent to the [`Client::sync`] method but the responses are provided
3107 /// as an async stream.
3108 ///
3109 /// # Arguments
3110 ///
3111 /// * `sync_settings` - Settings for the sync call. *Note* that those
3112 /// settings will be only used for the first sync call. See the argument
3113 /// docs for [`Client::sync_once`] for more info.
3114 ///
3115 /// # Examples
3116 ///
3117 /// ```no_run
3118 /// # use url::Url;
3119 /// # async {
3120 /// # let homeserver = Url::parse("http://localhost:8080")?;
3121 /// # let username = "";
3122 /// # let password = "";
3123 /// use futures_util::StreamExt;
3124 /// use matrix_sdk::{Client, config::SyncSettings};
3125 ///
3126 /// let client = Client::new(homeserver).await?;
3127 /// client.matrix_auth().login_username(&username, &password).send().await?;
3128 ///
3129 /// let mut sync_stream =
3130 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
3131 ///
3132 /// while let Some(Ok(response)) = sync_stream.next().await {
3133 /// for room in response.rooms.joined.values() {
3134 /// for e in &room.timeline.events {
3135 /// if let Ok(event) = e.raw().deserialize() {
3136 /// println!("Received event {:?}", event);
3137 /// }
3138 /// }
3139 /// }
3140 /// }
3141 ///
3142 /// # anyhow::Ok(()) };
3143 /// ```
3144 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3145 #[instrument(skip(self))]
3146 pub async fn sync_stream(
3147 &self,
3148 mut sync_settings: crate::config::SyncSettings,
3149 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3150 let mut is_first_sync = true;
3151 let mut timeout = None;
3152 let mut last_sync_time: Option<Instant> = None;
3153
3154 let parent_span = Span::current();
3155
3156 async_stream::stream!({
3157 loop {
3158 trace!("Syncing");
3159
3160 if sync_settings.ignore_timeout_on_first_sync {
3161 if is_first_sync {
3162 timeout = sync_settings.timeout.take();
3163 } else if sync_settings.timeout.is_none() && timeout.is_some() {
3164 sync_settings.timeout = timeout.take();
3165 }
3166
3167 is_first_sync = false;
3168 }
3169
3170 yield self
3171 .sync_loop_helper(&mut sync_settings)
3172 .instrument(parent_span.clone())
3173 .await;
3174
3175 Client::delay_sync(&mut last_sync_time).await
3176 }
3177 })
3178 }
3179
3180 /// Get the current, if any, sync token of the client.
3181 /// This will be None if the client didn't sync at least once.
3182 pub(crate) async fn sync_token(&self) -> Option<String> {
3183 self.inner.base_client.sync_token().await
3184 }
3185
3186 /// Gets information about the owner of a given access token.
3187 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3188 let request = whoami::v3::Request::new();
3189 self.send(request).await
3190 }
3191
3192 /// Subscribes a new receiver to client SessionChange broadcasts.
3193 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3194 let broadcast = &self.auth_ctx().session_change_sender;
3195 broadcast.subscribe()
3196 }
3197
3198 /// Sets the save/restore session callbacks.
3199 ///
3200 /// This is another mechanism to get synchronous updates to session tokens,
3201 /// while [`Self::subscribe_to_session_changes`] provides an async update.
3202 pub fn set_session_callbacks(
3203 &self,
3204 reload_session_callback: Box<ReloadSessionCallback>,
3205 save_session_callback: Box<SaveSessionCallback>,
3206 ) -> Result<()> {
3207 self.inner
3208 .auth_ctx
3209 .reload_session_callback
3210 .set(reload_session_callback)
3211 .map_err(|_| Error::MultipleSessionCallbacks)?;
3212
3213 self.inner
3214 .auth_ctx
3215 .save_session_callback
3216 .set(save_session_callback)
3217 .map_err(|_| Error::MultipleSessionCallbacks)?;
3218
3219 Ok(())
3220 }
3221
3222 /// Get the notification settings of the current owner of the client.
3223 pub async fn notification_settings(&self) -> NotificationSettings {
3224 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3225 NotificationSettings::new(self.clone(), ruleset)
3226 }
3227
3228 /// Create a new specialized `Client` that can process notifications.
3229 ///
3230 /// See [`CrossProcessLock::new`] to learn more about
3231 /// `cross_process_lock_config`.
3232 ///
3233 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3234 pub async fn notification_client(
3235 &self,
3236 cross_process_lock_config: CrossProcessLockConfig,
3237 ) -> Result<Client> {
3238 let client = Client {
3239 inner: ClientInner::new(
3240 self.inner.auth_ctx.clone(),
3241 self.server().cloned(),
3242 self.homeserver(),
3243 self.sliding_sync_version(),
3244 self.inner.http_client.clone(),
3245 self.inner
3246 .base_client
3247 .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3248 .await?,
3249 self.inner.caches.supported_versions.value(),
3250 self.inner.caches.well_known.value(),
3251 self.inner.respect_login_well_known,
3252 self.inner.event_cache.clone(),
3253 self.inner.send_queue_data.clone(),
3254 self.inner.latest_events.clone(),
3255 #[cfg(feature = "e2e-encryption")]
3256 self.inner.e2ee.encryption_settings,
3257 #[cfg(feature = "e2e-encryption")]
3258 self.inner.enable_share_history_on_invite,
3259 cross_process_lock_config,
3260 #[cfg(feature = "experimental-search")]
3261 self.inner.search_index.clone(),
3262 self.inner.thread_subscription_catchup.clone(),
3263 self.inner.media_fetcher.clone(),
3264 )
3265 .await,
3266 };
3267
3268 Ok(client)
3269 }
3270
3271 /// The [`EventCache`] instance for this [`Client`].
3272 pub fn event_cache(&self) -> &EventCache {
3273 // SAFETY: always initialized in the `Client` ctor.
3274 self.inner.event_cache.get().unwrap()
3275 }
3276
3277 /// The [`LatestEvents`] instance for this [`Client`].
3278 pub async fn latest_events(&self) -> &LatestEvents {
3279 self.inner
3280 .latest_events
3281 .get_or_init(|| async {
3282 LatestEvents::new(
3283 WeakClient::from_client(self),
3284 self.event_cache().clone(),
3285 SendQueue::new(self.clone()),
3286 self.room_info_notable_update_receiver(),
3287 )
3288 })
3289 .await
3290 }
3291
3292 /// Waits until an at least partially synced room is received, and returns
3293 /// it.
3294 ///
3295 /// **Note: this function will loop endlessly until either it finds the room
3296 /// or an externally set timeout happens.**
3297 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3298 loop {
3299 if let Some(room) = self.get_room(room_id) {
3300 if room.is_state_partially_or_fully_synced() {
3301 debug!("Found just created room!");
3302 return room;
3303 }
3304 debug!("Room wasn't partially synced, waiting for sync beat to try again");
3305 } else {
3306 debug!("Room wasn't found, waiting for sync beat to try again");
3307 }
3308 self.inner.sync_beat.listen().await;
3309 }
3310 }
3311
3312 /// Knock on a room given its `room_id_or_alias` to ask for permission to
3313 /// join it.
3314 pub async fn knock(
3315 &self,
3316 room_id_or_alias: OwnedRoomOrAliasId,
3317 reason: Option<String>,
3318 server_names: Vec<OwnedServerName>,
3319 ) -> Result<Room> {
3320 let request =
3321 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3322 let response = self.send(request).await?;
3323 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3324 Ok(Room::new(self.clone(), base_room))
3325 }
3326
3327 /// Checks whether the provided `user_id` belongs to an ignored user.
3328 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3329 self.base_client().is_user_ignored(user_id).await
3330 }
3331
3332 /// Gets the `max_upload_size` value from the homeserver, getting either a
3333 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3334 /// missing.
3335 ///
3336 /// Check the spec for more info:
3337 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3338 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3339 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3340 if let Some(data) = max_upload_size_lock.get() {
3341 return Ok(data.to_owned());
3342 }
3343
3344 // Use the authenticated endpoint when the server supports it.
3345 let supported_versions = self.supported_versions().await?;
3346 let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3347 .is_supported(&supported_versions);
3348
3349 let upload_size = if use_auth {
3350 self.send(authenticated_media::get_media_config::v1::Request::default())
3351 .await?
3352 .upload_size
3353 } else {
3354 #[allow(deprecated)]
3355 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3356 };
3357
3358 match max_upload_size_lock.set(upload_size) {
3359 Ok(_) => Ok(upload_size),
3360 Err(error) => {
3361 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3362 }
3363 }
3364 }
3365
3366 /// The settings to use for decrypting events.
3367 #[cfg(feature = "e2e-encryption")]
3368 pub fn decryption_settings(&self) -> &DecryptionSettings {
3369 &self.base_client().decryption_settings
3370 }
3371
3372 /// Returns the [`SearchIndex`] for this [`Client`].
3373 #[cfg(feature = "experimental-search")]
3374 pub fn search_index(&self) -> &SearchIndex {
3375 &self.inner.search_index
3376 }
3377
3378 /// Whether the client is configured to take thread subscriptions (MSC4306
3379 /// and MSC4308) into account, and the server enabled the experimental
3380 /// feature flag for it.
3381 ///
3382 /// This may cause filtering out of thread subscriptions, and loading the
3383 /// thread subscriptions via the sliding sync extension, when the room
3384 /// list service is being used.
3385 ///
3386 /// This is async and fallible as it may use the network to retrieve the
3387 /// server supported features, if they aren't cached already.
3388 pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3389 // Check if the client is configured to support thread subscriptions first.
3390 match self.base_client().threading_support {
3391 ThreadingSupport::Enabled { with_subscriptions: false }
3392 | ThreadingSupport::Disabled => return Ok(false),
3393 ThreadingSupport::Enabled { with_subscriptions: true } => {}
3394 }
3395
3396 // Now, let's check that the server supports it.
3397 let server_enabled = self
3398 .supported_versions()
3399 .await?
3400 .features
3401 .contains(&FeatureFlag::from("org.matrix.msc4306"));
3402
3403 Ok(server_enabled)
3404 }
3405
3406 /// Fetch thread subscriptions changes between `from` and up to `to`.
3407 ///
3408 /// The `limit` optional parameter can be used to limit the number of
3409 /// entries in a response. It can also be overridden by the server, if
3410 /// it's deemed too large.
3411 pub async fn fetch_thread_subscriptions(
3412 &self,
3413 from: Option<String>,
3414 to: Option<String>,
3415 limit: Option<UInt>,
3416 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3417 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3418 from,
3419 to,
3420 limit,
3421 });
3422 Ok(self.send(request).await?)
3423 }
3424
3425 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3426 self.inner.thread_subscription_catchup.get().unwrap()
3427 }
3428
3429 /// Pause the client for background suspension.
3430 ///
3431 /// This method:
3432 /// 1. Disables all send queues (prevents new message sends).
3433 /// 2. Pauses all database stores, waiting for in-flight operations and
3434 /// releasing all connections and file locks.
3435 ///
3436 /// Call [`Client::resume()`] when the app returns to the foreground.
3437 ///
3438 /// # iOS
3439 ///
3440 /// Call this before the app is suspended to avoid `0xdead10cc` kills.
3441 /// Typically called from
3442 /// [`applicationDidEnterBackground`](https://developer.apple.com/documentation/uikit/uiapplicationdelegate/applicationdidenterbackground(_:))
3443 /// or an equivalent SwiftUI lifecycle event, *after* stopping the
3444 /// `matrix_sdk_ui::sync_service::SyncService`.
3445 pub async fn pause(&self) -> Result<()> {
3446 info!("Client::pause — releasing database resources");
3447
3448 // Disable send queues so no new sends hit the stores.
3449 self.send_queue().set_enabled(false).await;
3450
3451 // Close all stores (waits for in-flight ops, closes connections).
3452 self.base_client().close_stores().await?;
3453
3454 info!("Client::pause — complete, all database connections released");
3455 Ok(())
3456 }
3457
3458 /// Resume the client after a [`Client::pause()`].
3459 ///
3460 /// Re-acquires store resources and re-enables send queues.
3461 ///
3462 /// If your app stopped the `matrix_sdk_ui::sync_service::SyncService`
3463 /// before pausing, restart it separately as appropriate for your app
3464 /// lifecycle.
3465 pub async fn resume(&self) -> Result<()> {
3466 info!("Client::resume — re-acquiring database resources");
3467
3468 // Reopen stores (creates new connection pools).
3469 self.base_client().reopen_stores().await?;
3470
3471 // Re-enable send queues.
3472 self.send_queue().set_enabled(true).await;
3473
3474 info!("Client::resume — complete");
3475 Ok(())
3476 }
3477
3478 /// Perform database optimizations if any are available, i.e. vacuuming in
3479 /// SQLite.
3480 ///
3481 /// **Warning:** this was added to check if SQLite fragmentation was the
3482 /// source of performance issues, **DO NOT use in production**.
3483 #[doc(hidden)]
3484 pub async fn optimize_stores(&self) -> Result<()> {
3485 trace!("Optimizing state store...");
3486 self.state_store().optimize().await?;
3487
3488 trace!("Optimizing event cache store...");
3489 if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3490 clean_lock.optimize().await?;
3491 }
3492
3493 trace!("Optimizing media store...");
3494 self.media_store().lock().await?.optimize().await?;
3495
3496 Ok(())
3497 }
3498
3499 /// Returns the sizes of the existing stores, if known.
3500 pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3501 #[cfg(feature = "e2e-encryption")]
3502 let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3503 && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3504 {
3505 Some(store_size)
3506 } else {
3507 None
3508 };
3509 #[cfg(not(feature = "e2e-encryption"))]
3510 let crypto_store_size = None;
3511
3512 let state_store_size = self.state_store().get_size().await.ok().flatten();
3513
3514 let event_cache_store_size = if let Some(clean_lock) =
3515 self.event_cache_store().lock().await?.as_clean()
3516 && let Ok(Some(store_size)) = clean_lock.get_size().await
3517 {
3518 Some(store_size)
3519 } else {
3520 None
3521 };
3522
3523 let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3524
3525 Ok(StoreSizes {
3526 crypto_store: crypto_store_size,
3527 state_store: state_store_size,
3528 event_cache_store: event_cache_store_size,
3529 media_store: media_store_size,
3530 })
3531 }
3532
3533 /// Get a reference to the client's task monitor, for spawning background
3534 /// tasks.
3535 pub fn task_monitor(&self) -> &TaskMonitor {
3536 &self.inner.task_monitor
3537 }
3538
3539 /// Add a subscriber for duplicate key upload error notifications triggered
3540 /// by requests to /keys/upload.
3541 #[cfg(feature = "e2e-encryption")]
3542 pub fn subscribe_to_duplicate_key_upload_errors(
3543 &self,
3544 ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3545 self.inner.duplicate_key_upload_error_sender.subscribe()
3546 }
3547
3548 /// Check the record of whether we are waiting for an [MSC4268] key bundle
3549 /// for the given room.
3550 ///
3551 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3552 #[cfg(feature = "e2e-encryption")]
3553 pub async fn get_pending_key_bundle_details_for_room(
3554 &self,
3555 room_id: &RoomId,
3556 ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3557 Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3558 }
3559
3560 /// Returns the [`DmRoomDefinition`] this client uses to check if a room is
3561 /// a DM.
3562 pub fn dm_room_definition(&self) -> &DmRoomDefinition {
3563 &self.inner.base_client.dm_room_definition
3564 }
3565}
3566
3567/// Contains the disk size of the different stores, if known. It won't be
3568/// available for in-memory stores.
3569#[derive(Debug, Clone)]
3570pub struct StoreSizes {
3571 /// The size of the CryptoStore.
3572 pub crypto_store: Option<usize>,
3573 /// The size of the StateStore.
3574 pub state_store: Option<usize>,
3575 /// The size of the EventCacheStore.
3576 pub event_cache_store: Option<usize>,
3577 /// The size of the MediaStore.
3578 pub media_store: Option<usize>,
3579}
3580
3581#[cfg(any(feature = "testing", test))]
3582impl Client {
3583 /// Test helper to mark users as tracked by the crypto layer.
3584 #[cfg(feature = "e2e-encryption")]
3585 pub async fn update_tracked_users_for_testing(
3586 &self,
3587 user_ids: impl IntoIterator<Item = &UserId>,
3588 ) {
3589 let olm = self.olm_machine().await;
3590 let olm = olm.as_ref().unwrap();
3591 olm.update_tracked_users(user_ids).await.unwrap();
3592 }
3593}
3594
3595/// A weak reference to the inner client, useful when trying to get a handle
3596/// on the owning client.
3597#[derive(Clone, Debug)]
3598pub(crate) struct WeakClient {
3599 client: Weak<ClientInner>,
3600}
3601
3602impl WeakClient {
3603 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3604 pub(crate) fn from_inner(client: &Arc<ClientInner>) -> Self {
3605 Self { client: Arc::downgrade(client) }
3606 }
3607
3608 /// Construct a [`WeakClient`] from a [`Client`].
3609 pub fn from_client(client: &Client) -> Self {
3610 Self::from_inner(&client.inner)
3611 }
3612
3613 /// Attempts to get a [`Client`] from this [`WeakClient`].
3614 pub fn get(&self) -> Option<Client> {
3615 self.client.upgrade().map(|inner| Client { inner })
3616 }
3617
3618 /// Gets the number of strong (`Arc`) pointers still pointing to this
3619 /// client.
3620 #[allow(dead_code)]
3621 pub fn strong_count(&self) -> usize {
3622 self.client.strong_count()
3623 }
3624}
3625
3626/// Information about the state of a room before we joined it.
3627#[derive(Debug, Clone, Default)]
3628struct PreJoinRoomInfo {
3629 /// The user who invited us to the room, if any.
3630 pub inviter: Option<RoomMember>,
3631}
3632
3633// The http mocking library is not supported for wasm32
3634#[cfg(all(test, not(target_family = "wasm")))]
3635pub(crate) mod tests {
3636 use std::{sync::Arc, time::Duration};
3637
3638 use assert_matches::assert_matches;
3639 use assert_matches2::assert_let;
3640 use eyeball::SharedObservable;
3641 use futures_util::{FutureExt, StreamExt, pin_mut};
3642 use js_int::{UInt, uint};
3643 use matrix_sdk_base::{
3644 RoomState,
3645 store::{MemoryStore, StoreConfig},
3646 ttl::TtlValue,
3647 };
3648 use matrix_sdk_test::{
3649 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3650 event_factory::EventFactory,
3651 };
3652 #[cfg(target_family = "wasm")]
3653 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3654
3655 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3656 use ruma::{
3657 RoomId, ServerName, UserId,
3658 api::{
3659 FeatureFlag, MatrixVersion,
3660 client::{room::create_room::v3::Request as CreateRoomRequest, rtc::RtcTransport},
3661 },
3662 assign,
3663 events::{
3664 ignored_user_list::IgnoredUserListEventContent,
3665 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3666 },
3667 owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3668 };
3669 use serde_json::json;
3670 use stream_assert::{assert_next_matches, assert_pending};
3671 use tokio::{
3672 spawn,
3673 time::{sleep, timeout},
3674 };
3675 use url::Url;
3676
3677 use super::Client;
3678 use crate::{
3679 Error, TransmissionProgress,
3680 client::{WeakClient, caches::CachedValue, futures::SendMediaUploadRequest},
3681 config::{RequestConfig, SyncSettings},
3682 futures::SendRequest,
3683 media::MediaError,
3684 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3685 };
3686
3687 #[async_test]
3688 async fn test_account_data() {
3689 let server = MatrixMockServer::new().await;
3690 let client = server.client_builder().build().await;
3691
3692 let f = EventFactory::new();
3693 server
3694 .mock_sync()
3695 .ok_and_run(&client, |builder| {
3696 builder.add_global_account_data(
3697 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3698 );
3699 })
3700 .await;
3701
3702 let content = client
3703 .account()
3704 .account_data::<IgnoredUserListEventContent>()
3705 .await
3706 .unwrap()
3707 .unwrap()
3708 .deserialize()
3709 .unwrap();
3710
3711 assert_eq!(content.ignored_users.len(), 1);
3712 }
3713
3714 #[async_test]
3715 async fn test_successful_discovery() {
3716 // Imagine this is `matrix.org`.
3717 let server = MatrixMockServer::new().await;
3718 let server_url = server.uri();
3719
3720 // Imagine this is `matrix-client.matrix.org`.
3721 let homeserver = MatrixMockServer::new().await;
3722 let homeserver_url = homeserver.uri();
3723
3724 // Imagine Alice has the user ID `@alice:matrix.org`.
3725 let domain = server_url.strip_prefix("http://").unwrap();
3726 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3727
3728 // The `.well-known` is on the server (e.g. `matrix.org`).
3729 server
3730 .mock_well_known()
3731 .ok_with_homeserver_url(&homeserver_url)
3732 .mock_once()
3733 .named("well-known")
3734 .mount()
3735 .await;
3736
3737 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3738 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3739
3740 let client = Client::builder()
3741 .insecure_server_name_no_tls(alice.server_name())
3742 .build()
3743 .await
3744 .unwrap();
3745
3746 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3747 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3748 client.server_versions().await.unwrap();
3749 }
3750
3751 #[async_test]
3752 async fn test_discovery_broken_server() {
3753 let server = MatrixMockServer::new().await;
3754 let server_url = server.uri();
3755 let domain = server_url.strip_prefix("http://").unwrap();
3756 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3757
3758 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3759
3760 assert!(
3761 Client::builder()
3762 .insecure_server_name_no_tls(alice.server_name())
3763 .build()
3764 .await
3765 .is_err(),
3766 "Creating a client from a user ID should fail when the .well-known request fails."
3767 );
3768 }
3769
3770 #[async_test]
3771 async fn test_room_creation() {
3772 let server = MatrixMockServer::new().await;
3773 let client = server.client_builder().build().await;
3774
3775 let f = EventFactory::new().sender(user_id!("@example:localhost"));
3776 server
3777 .mock_sync()
3778 .ok_and_run(&client, |builder| {
3779 builder.add_joined_room(
3780 JoinedRoomBuilder::default()
3781 .add_state_event(
3782 f.member(user_id!("@example:localhost")).display_name("example"),
3783 )
3784 .add_state_event(f.default_power_levels()),
3785 );
3786 })
3787 .await;
3788
3789 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3790 assert_eq!(room.state(), RoomState::Joined);
3791 }
3792
3793 #[async_test]
3794 async fn test_retry_limit_http_requests() {
3795 let server = MatrixMockServer::new().await;
3796 let client = server
3797 .client_builder()
3798 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3799 .build()
3800 .await;
3801
3802 assert!(client.request_config().retry_limit.unwrap() == 4);
3803
3804 server.mock_who_am_i().error500().expect(4).mount().await;
3805
3806 client.whoami().await.unwrap_err();
3807 }
3808
3809 #[async_test]
3810 async fn test_retry_timeout_http_requests() {
3811 // Keep this timeout small so that the test doesn't take long
3812 let retry_timeout = Duration::from_secs(5);
3813 let server = MatrixMockServer::new().await;
3814 let client = server
3815 .client_builder()
3816 .on_builder(|builder| {
3817 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3818 })
3819 .build()
3820 .await;
3821
3822 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3823
3824 server.mock_login().error500().expect(2..).mount().await;
3825
3826 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3827 }
3828
3829 #[async_test]
3830 async fn test_short_retry_initial_http_requests() {
3831 let server = MatrixMockServer::new().await;
3832 let client = server
3833 .client_builder()
3834 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3835 .build()
3836 .await;
3837
3838 server.mock_login().error500().expect(3..).mount().await;
3839
3840 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3841 }
3842
3843 #[async_test]
3844 async fn test_no_retry_http_requests() {
3845 let server = MatrixMockServer::new().await;
3846 let client = server.client_builder().build().await;
3847
3848 server.mock_devices().error500().mock_once().mount().await;
3849
3850 client.devices().await.unwrap_err();
3851 }
3852
3853 #[async_test]
3854 async fn test_set_homeserver() {
3855 let client = MockClientBuilder::new(None).build().await;
3856 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3857
3858 let homeserver = Url::parse("http://example.com/").unwrap();
3859 client.set_homeserver(homeserver.clone());
3860 assert_eq!(client.homeserver(), homeserver);
3861 }
3862
3863 #[async_test]
3864 async fn test_search_user_request() {
3865 let server = MatrixMockServer::new().await;
3866 let client = server.client_builder().build().await;
3867
3868 server.mock_user_directory().ok().mock_once().mount().await;
3869
3870 let response = client.search_users("test", 50).await.unwrap();
3871 assert_eq!(response.results.len(), 1);
3872 let result = &response.results[0];
3873 assert_eq!(result.user_id.to_string(), "@test:example.me");
3874 assert_eq!(result.display_name.clone().unwrap(), "Test");
3875 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3876 assert!(!response.limited);
3877 }
3878
3879 #[async_test]
3880 async fn test_request_unstable_features() {
3881 let server = MatrixMockServer::new().await;
3882 let client = server.client_builder().no_server_versions().build().await;
3883
3884 server
3885 .mock_versions()
3886 .with_feature("org.matrix.e2e_cross_signing", true)
3887 .ok()
3888 .mock_once()
3889 .mount()
3890 .await;
3891
3892 let unstable_features = client.unstable_features().await.unwrap();
3893 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3894 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3895 }
3896
3897 #[async_test]
3898 async fn test_can_homeserver_push_encrypted_event_to_device() {
3899 let server = MatrixMockServer::new().await;
3900 let client = server.client_builder().no_server_versions().build().await;
3901
3902 server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3903
3904 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3905 assert!(msc4028_enabled);
3906 }
3907
3908 #[async_test]
3909 async fn test_recently_visited_rooms() {
3910 // Tracking recently visited rooms requires authentication
3911 let client = MockClientBuilder::new(None).unlogged().build().await;
3912 assert_matches!(
3913 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3914 Err(Error::AuthenticationRequired)
3915 );
3916
3917 let client = MockClientBuilder::new(None).build().await;
3918 let account = client.account();
3919
3920 // We should start off with an empty list
3921 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3922
3923 // Tracking a valid room id should add it to the list
3924 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3925 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3926 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3927
3928 // And the existing list shouldn't be changed
3929 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3930 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3931
3932 // Tracking the same room again shouldn't change the list
3933 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3934 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3935 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3936
3937 // Tracking a second room should add it to the front of the list
3938 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3939 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3940 assert_eq!(
3941 account.get_recently_visited_rooms().await.unwrap(),
3942 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3943 );
3944
3945 // Tracking the first room yet again should move it to the front of the list
3946 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3947 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3948 assert_eq!(
3949 account.get_recently_visited_rooms().await.unwrap(),
3950 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3951 );
3952
3953 // Tracking should be capped at 20
3954 for n in 0..20 {
3955 account
3956 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3957 .await
3958 .unwrap();
3959 }
3960
3961 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3962
3963 // And the initial rooms should've been pushed out
3964 let rooms = account.get_recently_visited_rooms().await.unwrap();
3965 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3966 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3967
3968 // And the last tracked room should be the first
3969 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3970 }
3971
3972 #[async_test]
3973 async fn test_client_no_cycle_with_event_cache() {
3974 let client = MockClientBuilder::new(None).build().await;
3975
3976 // Wait for the init tasks to die.
3977 sleep(Duration::from_secs(1)).await;
3978
3979 let weak_client = WeakClient::from_client(&client);
3980 assert_eq!(weak_client.strong_count(), 1);
3981
3982 {
3983 let room_id = room_id!("!room:example.org");
3984
3985 // Have the client know the room.
3986 let response = SyncResponseBuilder::default()
3987 .add_joined_room(JoinedRoomBuilder::new(room_id))
3988 .build_sync_response();
3989 client.inner.base_client.receive_sync_response(response).await.unwrap();
3990
3991 client.event_cache().subscribe().unwrap();
3992
3993 let (_room_event_cache, _drop_handles) =
3994 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3995 }
3996
3997 drop(client);
3998
3999 // Give a bit of time for background tasks to die.
4000 sleep(Duration::from_secs(1)).await;
4001
4002 // The weak client must be the last reference to the client now.
4003 assert_eq!(weak_client.strong_count(), 0);
4004 let client = weak_client.get();
4005 assert!(
4006 client.is_none(),
4007 "too many strong references to the client: {}",
4008 Arc::strong_count(&client.unwrap().inner)
4009 );
4010 }
4011
4012 #[async_test]
4013 async fn test_supported_versions_caching() {
4014 let server = MatrixMockServer::new().await;
4015
4016 let versions_mock = server
4017 .mock_versions()
4018 .expect_default_access_token()
4019 .with_feature("org.matrix.e2e_cross_signing", true)
4020 .ok()
4021 .named("first versions mock")
4022 .expect(1)
4023 .mount_as_scoped()
4024 .await;
4025
4026 let memory_store = Arc::new(MemoryStore::new());
4027 let client = server
4028 .client_builder()
4029 .no_server_versions()
4030 .on_builder(|builder| {
4031 builder.store_config(
4032 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4033 .state_store(memory_store.clone()),
4034 )
4035 })
4036 .build()
4037 .await;
4038
4039 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4040
4041 // The result was cached.
4042 assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
4043 // This subsequent call hits the in-memory cache.
4044 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4045
4046 drop(client);
4047
4048 let client = server
4049 .client_builder()
4050 .no_server_versions()
4051 .on_builder(|builder| {
4052 builder.store_config(
4053 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4054 .state_store(memory_store.clone()),
4055 )
4056 })
4057 .build()
4058 .await;
4059
4060 // These calls to the new client hit the on-disk cache.
4061 assert!(
4062 client
4063 .unstable_features()
4064 .await
4065 .unwrap()
4066 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
4067 );
4068
4069 let supported = client.supported_versions().await.unwrap();
4070 assert!(supported.versions.contains(&MatrixVersion::V1_0));
4071 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4072
4073 // Then this call hits the in-memory cache.
4074 let supported = client.supported_versions().await.unwrap();
4075 assert!(supported.versions.contains(&MatrixVersion::V1_0));
4076 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4077
4078 drop(versions_mock);
4079
4080 // Now, reset the cache, and observe the endpoint being called again once.
4081 client.reset_supported_versions().await.unwrap();
4082
4083 server.mock_versions().ok().expect(2).named("second versions mock").mount().await;
4084
4085 // Hits network again.
4086 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4087 // Hits in-memory cache again.
4088 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4089 assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4090
4091 // Force an expiry of the data.
4092 let supported_versions = client.supported_versions_cached().await.unwrap().unwrap();
4093 let mut ttl_value = TtlValue::new(supported_versions);
4094 ttl_value.expire();
4095 client.inner.caches.supported_versions.set_value(ttl_value);
4096
4097 // Call the method to trigger a cache refresh background task.
4098 client.supported_versions_cached().await.unwrap().unwrap();
4099
4100 // We wait for the task to finish, the endpoint should have been called again.
4101 sleep(Duration::from_secs(1)).await;
4102 assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4103 }
4104
4105 #[async_test]
4106 async fn test_well_known_caching() {
4107 let server = MatrixMockServer::new().await;
4108 let server_url = server.uri();
4109 let domain = server_url.strip_prefix("http://").unwrap();
4110 let server_name = <&ServerName>::try_from(domain).unwrap();
4111 let rtc_foci = vec![RtcTransport::livekit("https://livekit.example.com".to_owned())];
4112
4113 let well_known_mock = server
4114 .mock_well_known()
4115 .ok()
4116 .named("well known mock")
4117 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
4118 .mount_as_scoped()
4119 .await;
4120
4121 let memory_store = Arc::new(MemoryStore::new());
4122 let client = Client::builder()
4123 .insecure_server_name_no_tls(server_name)
4124 .store_config(
4125 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4126 .state_store(memory_store.clone()),
4127 )
4128 .build()
4129 .await
4130 .unwrap();
4131
4132 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4133
4134 // This subsequent call hits the in-memory cache.
4135 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4136
4137 drop(client);
4138
4139 let client = server
4140 .client_builder()
4141 .no_server_versions()
4142 .on_builder(|builder| {
4143 builder.store_config(
4144 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4145 .state_store(memory_store.clone()),
4146 )
4147 })
4148 .build()
4149 .await;
4150
4151 // This call to the new client hits the on-disk cache.
4152 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4153
4154 // Then this call hits the in-memory cache.
4155 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4156
4157 drop(well_known_mock);
4158
4159 // Now, reset the cache, and observe the endpoints being called again once.
4160 client.reset_well_known().await.unwrap();
4161
4162 server.mock_well_known().ok().named("second well known mock").expect(2).mount().await;
4163
4164 // Hits network again.
4165 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4166 // Hits in-memory cache again.
4167 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4168
4169 // Force an expiry of the data.
4170 let well_known = client.well_known().await;
4171 let mut ttl_value = TtlValue::new(well_known);
4172 ttl_value.expire();
4173 client.inner.caches.well_known.set_value(ttl_value);
4174
4175 // Call the method again to trigger a cache refresh background task.
4176 client.well_known().await;
4177
4178 // We wait for the task to finish, the endpoint should have been called again.
4179 // We need to wait a bit because the first requests using the server name of the
4180 // user will fail, only the requests using the homeserver URL will succeed.
4181 sleep(Duration::from_secs(5)).await;
4182 assert_matches!(client.inner.caches.well_known.value(), CachedValue::Cached(value) if !value.has_expired());
4183 }
4184
4185 #[async_test]
4186 async fn test_missing_well_known_caching() {
4187 let server = MatrixMockServer::new().await;
4188 let rtc_foci: Vec<RtcTransport> = vec![];
4189
4190 let well_known_mock = server
4191 .mock_well_known()
4192 .error_unrecognized()
4193 .named("first well-known mock")
4194 .expect(1)
4195 .mount_as_scoped()
4196 .await;
4197
4198 let memory_store = Arc::new(MemoryStore::new());
4199 let client = server
4200 .client_builder()
4201 .on_builder(|builder| {
4202 builder.store_config(
4203 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4204 .state_store(memory_store.clone()),
4205 )
4206 })
4207 .build()
4208 .await;
4209
4210 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4211
4212 // This subsequent call hits the in-memory cache.
4213 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4214
4215 drop(client);
4216
4217 let client = server
4218 .client_builder()
4219 .on_builder(|builder| {
4220 builder.store_config(
4221 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4222 .state_store(memory_store.clone()),
4223 )
4224 })
4225 .build()
4226 .await;
4227
4228 // This call to the new client hits the on-disk cache.
4229 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4230
4231 // Then this call hits the in-memory cache.
4232 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4233
4234 drop(well_known_mock);
4235
4236 // Now, reset the cache, and observe the endpoints being called again once.
4237 client.reset_well_known().await.unwrap();
4238
4239 server
4240 .mock_well_known()
4241 .error_unrecognized()
4242 .expect(1)
4243 .named("second well-known mock")
4244 .mount()
4245 .await;
4246
4247 // Hits network again.
4248 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4249 // Hits in-memory cache again.
4250 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4251 }
4252
4253 #[async_test]
4254 async fn test_no_network_doesnt_cause_infinite_retries() {
4255 // We want infinite retries for transient errors.
4256 let client = MockClientBuilder::new(None)
4257 .on_builder(|builder| builder.request_config(RequestConfig::new()))
4258 .build()
4259 .await;
4260
4261 // We don't define a mock server on purpose here, so that the error is really a
4262 // network error.
4263 client.whoami().await.unwrap_err();
4264 }
4265
4266 #[async_test]
4267 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4268 let server = MatrixMockServer::new().await;
4269 let client = server.client_builder().build().await;
4270
4271 let room_id = room_id!("!room:example.org");
4272
4273 server
4274 .mock_sync()
4275 .ok_and_run(&client, |builder| {
4276 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4277 })
4278 .await;
4279
4280 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4281 assert_eq!(room.room_id(), room_id);
4282 }
4283
4284 #[async_test]
4285 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4286 let server = MatrixMockServer::new().await;
4287 let client = server.client_builder().build().await;
4288
4289 let room_id = room_id!("!room:example.org");
4290
4291 let client = Arc::new(client);
4292
4293 // Perform the /sync request with a delay so it starts after the
4294 // `await_room_remote_echo` call has happened
4295 spawn({
4296 let client = client.clone();
4297 async move {
4298 sleep(Duration::from_millis(100)).await;
4299
4300 server
4301 .mock_sync()
4302 .ok_and_run(&client, |builder| {
4303 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4304 })
4305 .await;
4306 }
4307 });
4308
4309 let room =
4310 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4311 assert_eq!(room.room_id(), room_id);
4312 }
4313
4314 #[async_test]
4315 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4316 let client = MockClientBuilder::new(None).build().await;
4317
4318 let room_id = room_id!("!room:example.org");
4319 // Room is not present so the client won't be able to find it. The call will
4320 // timeout.
4321 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4322 }
4323
4324 #[async_test]
4325 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4326 let server = MatrixMockServer::new().await;
4327 let client = server.client_builder().build().await;
4328
4329 server.mock_create_room().ok().mount().await;
4330
4331 // Create a room in the internal store
4332 let room = client
4333 .create_room(assign!(CreateRoomRequest::new(), {
4334 invite: vec![],
4335 is_direct: false,
4336 }))
4337 .await
4338 .unwrap();
4339
4340 // Room is locally present, but not synced, the call will timeout
4341 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4342 .await
4343 .unwrap_err();
4344 }
4345
4346 #[async_test]
4347 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4348 let server = MatrixMockServer::new().await;
4349 let client = server.client_builder().build().await;
4350
4351 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4352
4353 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4354 assert_matches!(ret, Ok(true));
4355 }
4356
4357 #[async_test]
4358 async fn test_is_room_alias_available_if_alias_is_resolved() {
4359 let server = MatrixMockServer::new().await;
4360 let client = server.client_builder().build().await;
4361
4362 server
4363 .mock_room_directory_resolve_alias()
4364 .ok("!some_room_id:matrix.org", Vec::new())
4365 .expect(1)
4366 .mount()
4367 .await;
4368
4369 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4370 assert_matches!(ret, Ok(false));
4371 }
4372
4373 #[async_test]
4374 async fn test_is_room_alias_available_if_error_found() {
4375 let server = MatrixMockServer::new().await;
4376 let client = server.client_builder().build().await;
4377
4378 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4379
4380 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4381 assert_matches!(ret, Err(_));
4382 }
4383
4384 #[async_test]
4385 async fn test_create_room_alias() {
4386 let server = MatrixMockServer::new().await;
4387 let client = server.client_builder().build().await;
4388
4389 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4390
4391 let ret = client
4392 .create_room_alias(
4393 room_alias_id!("#some_alias:matrix.org"),
4394 room_id!("!some_room:matrix.org"),
4395 )
4396 .await;
4397 assert_matches!(ret, Ok(()));
4398 }
4399
4400 #[async_test]
4401 async fn test_join_room_by_id_or_alias() {
4402 use wiremock::{
4403 Mock, ResponseTemplate,
4404 matchers::{method, path_regex},
4405 };
4406 let server = MatrixMockServer::new().await;
4407 let client = server.client_builder().build().await;
4408
4409 let target_room_id = room_id!("!some_id:matrix.org");
4410 let target_alias = room_alias_id!("#some_alias:matrix.org");
4411
4412 Mock::given(method("POST"))
4413 .and(path_regex("^/_matrix/client/v3/join/.*$"))
4414 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
4415 "room_id": target_room_id
4416 })))
4417 .mount(server.server())
4418 .await;
4419
4420 server
4421 .mock_room_directory_resolve_alias()
4422 .ok(target_room_id.as_str(), Vec::new())
4423 .mount()
4424 .await;
4425
4426 server.mock_room_join(target_room_id).ok().mount().await;
4427
4428 let ret = client.join_room_by_id_or_alias(target_alias.into(), &[]).await;
4429 assert!(ret.is_ok());
4430
4431 let ret = client.join_room_by_id_or_alias(target_room_id.into(), &[]).await;
4432 assert!(ret.is_ok());
4433 }
4434
4435 #[async_test]
4436 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4437 let server = MatrixMockServer::new().await;
4438 let client = server.client_builder().build().await;
4439
4440 let room_id = room_id!("!a-room:matrix.org");
4441
4442 // Make sure the summary endpoint is called once
4443 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4444
4445 // We create a locally cached invited room
4446 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4447
4448 // And we get a preview, the server endpoint was reached
4449 let preview = client
4450 .get_room_preview(room_id.into(), Vec::new())
4451 .await
4452 .expect("Room preview should be retrieved");
4453
4454 assert_eq!(invited_room.room_id(), preview.room_id);
4455 }
4456
4457 #[async_test]
4458 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4459 let server = MatrixMockServer::new().await;
4460 let client = server.client_builder().build().await;
4461
4462 let room_id = room_id!("!a-room:matrix.org");
4463
4464 // Make sure the summary endpoint is called once
4465 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4466
4467 // We create a locally cached left room
4468 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4469
4470 // And we get a preview, the server endpoint was reached
4471 let preview = client
4472 .get_room_preview(room_id.into(), Vec::new())
4473 .await
4474 .expect("Room preview should be retrieved");
4475
4476 assert_eq!(left_room.room_id(), preview.room_id);
4477 }
4478
4479 #[async_test]
4480 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4481 let server = MatrixMockServer::new().await;
4482 let client = server.client_builder().build().await;
4483
4484 let room_id = room_id!("!a-room:matrix.org");
4485
4486 // Make sure the summary endpoint is called once
4487 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4488
4489 // We create a locally cached knocked room
4490 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4491
4492 // And we get a preview, the server endpoint was reached
4493 let preview = client
4494 .get_room_preview(room_id.into(), Vec::new())
4495 .await
4496 .expect("Room preview should be retrieved");
4497
4498 assert_eq!(knocked_room.room_id(), preview.room_id);
4499 }
4500
4501 #[async_test]
4502 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4503 let server = MatrixMockServer::new().await;
4504 let client = server.client_builder().build().await;
4505
4506 let room_id = room_id!("!a-room:matrix.org");
4507
4508 // Make sure the summary endpoint is not called
4509 server.mock_room_summary().ok(room_id).never().mount().await;
4510
4511 // We create a locally cached joined room
4512 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4513
4514 // And we get a preview, no server endpoint was reached
4515 let preview = client
4516 .get_room_preview(room_id.into(), Vec::new())
4517 .await
4518 .expect("Room preview should be retrieved");
4519
4520 assert_eq!(joined_room.room_id(), preview.room_id);
4521 }
4522
4523 #[async_test]
4524 async fn test_media_preview_config() {
4525 let server = MatrixMockServer::new().await;
4526 let client = server.client_builder().build().await;
4527
4528 server
4529 .mock_sync()
4530 .ok_and_run(&client, |builder| {
4531 builder.add_custom_global_account_data(json!({
4532 "content": {
4533 "media_previews": "private",
4534 "invite_avatars": "off"
4535 },
4536 "type": "m.media_preview_config"
4537 }));
4538 })
4539 .await;
4540
4541 let (initial_value, stream) =
4542 client.account().observe_media_preview_config().await.unwrap();
4543
4544 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4545 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4546 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4547 pin_mut!(stream);
4548 assert_pending!(stream);
4549
4550 server
4551 .mock_sync()
4552 .ok_and_run(&client, |builder| {
4553 builder.add_custom_global_account_data(json!({
4554 "content": {
4555 "media_previews": "off",
4556 "invite_avatars": "on"
4557 },
4558 "type": "m.media_preview_config"
4559 }));
4560 })
4561 .await;
4562
4563 assert_next_matches!(
4564 stream,
4565 MediaPreviewConfigEventContent {
4566 media_previews: Some(MediaPreviews::Off),
4567 invite_avatars: Some(InviteAvatars::On),
4568 ..
4569 }
4570 );
4571 assert_pending!(stream);
4572 }
4573
4574 #[async_test]
4575 async fn test_unstable_media_preview_config() {
4576 let server = MatrixMockServer::new().await;
4577 let client = server.client_builder().build().await;
4578
4579 server
4580 .mock_sync()
4581 .ok_and_run(&client, |builder| {
4582 builder.add_custom_global_account_data(json!({
4583 "content": {
4584 "media_previews": "private",
4585 "invite_avatars": "off"
4586 },
4587 "type": "io.element.msc4278.media_preview_config"
4588 }));
4589 })
4590 .await;
4591
4592 let (initial_value, stream) =
4593 client.account().observe_media_preview_config().await.unwrap();
4594
4595 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4596 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4597 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4598 pin_mut!(stream);
4599 assert_pending!(stream);
4600
4601 server
4602 .mock_sync()
4603 .ok_and_run(&client, |builder| {
4604 builder.add_custom_global_account_data(json!({
4605 "content": {
4606 "media_previews": "off",
4607 "invite_avatars": "on"
4608 },
4609 "type": "io.element.msc4278.media_preview_config"
4610 }));
4611 })
4612 .await;
4613
4614 assert_next_matches!(
4615 stream,
4616 MediaPreviewConfigEventContent {
4617 media_previews: Some(MediaPreviews::Off),
4618 invite_avatars: Some(InviteAvatars::On),
4619 ..
4620 }
4621 );
4622 assert_pending!(stream);
4623 }
4624
4625 #[async_test]
4626 async fn test_media_preview_config_not_found() {
4627 let server = MatrixMockServer::new().await;
4628 let client = server.client_builder().build().await;
4629
4630 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4631
4632 assert!(initial_value.is_none());
4633 }
4634
4635 #[async_test]
4636 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4637 // The default Matrix version we use is 1.11 or higher, so authenticated media
4638 // is supported.
4639 let server = MatrixMockServer::new().await;
4640 let client = server.client_builder().build().await;
4641
4642 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4643
4644 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4645 client.load_or_fetch_max_upload_size().await.unwrap();
4646
4647 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4648 }
4649
4650 #[async_test]
4651 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4652 // The server must advertise support for the stable feature for authenticated
4653 // media support, so we mock the `GET /versions` response.
4654 let server = MatrixMockServer::new().await;
4655 let client = server.client_builder().no_server_versions().build().await;
4656
4657 server
4658 .mock_versions()
4659 .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4660 .with_feature("org.matrix.msc3916.stable", true)
4661 .ok()
4662 .named("versions")
4663 .expect(1)
4664 .mount()
4665 .await;
4666
4667 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4668
4669 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4670 client.load_or_fetch_max_upload_size().await.unwrap();
4671
4672 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4673 }
4674
4675 #[async_test]
4676 async fn test_load_or_fetch_max_upload_size_no_auth() {
4677 // The server must not support Matrix 1.11 or higher for unauthenticated
4678 // media requests, so we mock the `GET /versions` response.
4679 let server = MatrixMockServer::new().await;
4680 let client = server.client_builder().no_server_versions().build().await;
4681
4682 server
4683 .mock_versions()
4684 .with_versions(vec!["v1.1"])
4685 .ok()
4686 .named("versions")
4687 .expect(1)
4688 .mount()
4689 .await;
4690
4691 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4692
4693 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4694 client.load_or_fetch_max_upload_size().await.unwrap();
4695
4696 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4697 }
4698
4699 #[async_test]
4700 async fn test_uploading_a_too_large_media_file() {
4701 let server = MatrixMockServer::new().await;
4702 let client = server.client_builder().build().await;
4703
4704 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4705 client.load_or_fetch_max_upload_size().await.unwrap();
4706 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4707
4708 let data = vec![1, 2];
4709 let upload_request =
4710 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4711 let request = SendRequest {
4712 client: client.clone(),
4713 request: upload_request,
4714 config: None,
4715 send_progress: SharedObservable::new(TransmissionProgress::default()),
4716 };
4717 let media_request = SendMediaUploadRequest::new(request);
4718
4719 let error = media_request.await.err();
4720 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4721 assert_eq!(max, uint!(1));
4722 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4723 }
4724
4725 #[async_test]
4726 async fn test_dont_ignore_timeout_on_first_sync() {
4727 let server = MatrixMockServer::new().await;
4728 let client = server.client_builder().build().await;
4729
4730 server
4731 .mock_sync()
4732 .timeout(Some(Duration::from_secs(30)))
4733 .ok(|_| {})
4734 .mock_once()
4735 .named("sync_with_timeout")
4736 .mount()
4737 .await;
4738
4739 // Call the endpoint once to check the timeout.
4740 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4741
4742 timeout(Duration::from_secs(1), async {
4743 stream.next().await.unwrap().unwrap();
4744 })
4745 .await
4746 .unwrap();
4747 }
4748
4749 #[async_test]
4750 async fn test_ignore_timeout_on_first_sync() {
4751 let server = MatrixMockServer::new().await;
4752 let client = server.client_builder().build().await;
4753
4754 server
4755 .mock_sync()
4756 .timeout(None)
4757 .ok(|_| {})
4758 .mock_once()
4759 .named("sync_no_timeout")
4760 .mount()
4761 .await;
4762 server
4763 .mock_sync()
4764 .timeout(Some(Duration::from_secs(30)))
4765 .ok(|_| {})
4766 .mock_once()
4767 .named("sync_with_timeout")
4768 .mount()
4769 .await;
4770
4771 // Call each version of the endpoint once to check the timeouts.
4772 let mut stream = Box::pin(
4773 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4774 );
4775
4776 timeout(Duration::from_secs(1), async {
4777 stream.next().await.unwrap().unwrap();
4778 stream.next().await.unwrap().unwrap();
4779 })
4780 .await
4781 .unwrap();
4782 }
4783
4784 #[async_test]
4785 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4786 let server = MatrixMockServer::new().await;
4787 let client = server.client_builder().build().await;
4788 // This is the user ID that is inside MemberAdditional.
4789 // Note the confusing username, so we can share
4790 // GlobalAccountDataTestEvent::Direct with the invited test.
4791 let user_id = user_id!("@invited:localhost");
4792
4793 // When we receive a sync response saying "invited" is invited to a DM
4794 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4795 let response = SyncResponseBuilder::default()
4796 .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4797 .add_global_account_data(
4798 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4799 )
4800 .build_sync_response();
4801 client.base_client().receive_sync_response(response).await.unwrap();
4802
4803 // Then get_dm_room finds this room
4804 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4805 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4806 }
4807
4808 #[async_test]
4809 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4810 let server = MatrixMockServer::new().await;
4811 let client = server.client_builder().build().await;
4812 // This is the user ID that is inside MemberInvite
4813 let user_id = user_id!("@invited:localhost");
4814
4815 // When we receive a sync response saying "invited" is invited to a DM
4816 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4817 let response = SyncResponseBuilder::default()
4818 .add_joined_room(
4819 JoinedRoomBuilder::default()
4820 .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4821 )
4822 .add_global_account_data(
4823 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4824 )
4825 .build_sync_response();
4826 client.base_client().receive_sync_response(response).await.unwrap();
4827
4828 // Then get_dm_room finds this room
4829 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4830 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4831 }
4832
4833 #[async_test]
4834 async fn test_get_dm_room_still_finds_left_room() {
4835 // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4836 // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4837
4838 let server = MatrixMockServer::new().await;
4839 let client = server.client_builder().build().await;
4840 // This is the user ID that is inside MemberAdditional.
4841 // Note the confusing username, so we can share
4842 // GlobalAccountDataTestEvent::Direct with the invited test.
4843 let user_id = user_id!("@invited:localhost");
4844
4845 // When we receive a sync response saying "invited" has left a DM
4846 let f = EventFactory::new().sender(user_id);
4847 let response = SyncResponseBuilder::default()
4848 .add_joined_room(
4849 JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4850 )
4851 .add_global_account_data(
4852 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4853 )
4854 .build_sync_response();
4855 client.base_client().receive_sync_response(response).await.unwrap();
4856
4857 // Then get_dm_room finds this room
4858 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4859 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4860 }
4861
4862 #[async_test]
4863 async fn test_device_exists() {
4864 let server = MatrixMockServer::new().await;
4865 let client = server.client_builder().build().await;
4866
4867 server.mock_get_device().ok().expect(1).mount().await;
4868
4869 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4870 }
4871
4872 #[async_test]
4873 async fn test_device_exists_404() {
4874 let server = MatrixMockServer::new().await;
4875 let client = server.client_builder().build().await;
4876
4877 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4878 }
4879
4880 #[async_test]
4881 async fn test_device_exists_500() {
4882 let server = MatrixMockServer::new().await;
4883 let client = server.client_builder().build().await;
4884
4885 server.mock_get_device().error500().expect(1).mount().await;
4886
4887 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4888 }
4889
4890 #[async_test]
4891 async fn test_fetching_well_known_with_homeserver_url() {
4892 let server = MatrixMockServer::new().await;
4893 let client = server.client_builder().build().await;
4894 server.mock_well_known().ok().mount().await;
4895
4896 assert_matches!(client.fetch_client_well_known().await, Some(_));
4897 }
4898
4899 #[async_test]
4900 async fn test_fetching_well_known_with_server_name() {
4901 let server = MatrixMockServer::new().await;
4902 let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4903
4904 server.mock_well_known().ok().mount().await;
4905
4906 let client = MockClientBuilder::new(None)
4907 .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4908 .build()
4909 .await;
4910
4911 assert_matches!(client.fetch_client_well_known().await, Some(_));
4912 }
4913
4914 #[async_test]
4915 async fn test_fetching_well_known_with_domain_part_of_user_id() {
4916 let server = MatrixMockServer::new().await;
4917 server.mock_well_known().ok().mount().await;
4918
4919 let user_id =
4920 UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4921 let client = MockClientBuilder::new(None)
4922 .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4923 .build()
4924 .await;
4925
4926 assert_matches!(client.fetch_client_well_known().await, Some(_));
4927 }
4928}