matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
33use matrix_sdk_base::{
34 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
35 StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport,
36 event_cache::store::EventCacheStoreLock,
37 media::store::MediaStoreLock,
38 store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
39 sync::{Notification, RoomUpdates},
40};
41use matrix_sdk_common::ttl_cache::TtlCache;
42#[cfg(feature = "e2e-encryption")]
43use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
44use ruma::{
45 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
46 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
47 api::{
48 FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
49 client::{
50 account::whoami,
51 alias::{create_alias, delete_alias, get_alias},
52 authenticated_media,
53 device::{delete_devices, get_devices, update_device},
54 directory::{get_public_rooms, get_public_rooms_filtered},
55 discovery::{
56 discover_homeserver::{self, RtcFocusInfo},
57 get_capabilities::{self, v3::Capabilities},
58 get_supported_versions,
59 },
60 error::ErrorKind,
61 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
62 knock::knock_room,
63 media,
64 membership::{join_room_by_id, join_room_by_id_or_alias},
65 room::create_room,
66 session::login::v3::DiscoveryInfo,
67 sync::sync_events,
68 threads::get_thread_subscriptions_changes,
69 uiaa,
70 user_directory::search_users,
71 },
72 error::FromHttpResponseError,
73 federation::discovery::get_server_version,
74 },
75 assign,
76 push::Ruleset,
77 time::Instant,
78};
79use serde::de::DeserializeOwned;
80use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
81use tracing::{Instrument, Span, debug, error, instrument, trace, warn};
82use url::Url;
83
84use self::futures::SendRequest;
85use crate::{
86 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
87 Room, SessionTokens, TransmissionProgress,
88 authentication::{
89 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
90 oauth::OAuth,
91 },
92 client::thread_subscriptions::ThreadSubscriptionCatchup,
93 config::{RequestConfig, SyncToken},
94 deduplicating_handler::DeduplicatingHandler,
95 error::HttpResult,
96 event_cache::EventCache,
97 event_handler::{
98 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
99 EventHandlerStore, ObservableEventHandler, SyncEvent,
100 },
101 http_client::{HttpClient, SupportedAuthScheme},
102 latest_events::LatestEvents,
103 media::MediaError,
104 notification_settings::NotificationSettings,
105 room::RoomMember,
106 room_preview::RoomPreview,
107 send_queue::{SendQueue, SendQueueData},
108 sliding_sync::Version as SlidingSyncVersion,
109 sync::{RoomUpdate, SyncResponse},
110};
111#[cfg(feature = "e2e-encryption")]
112use crate::{
113 cross_process_lock::CrossProcessLock,
114 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
115};
116
117mod builder;
118pub(crate) mod caches;
119pub(crate) mod futures;
120pub(crate) mod thread_subscriptions;
121
122pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
123#[cfg(feature = "experimental-search")]
124use crate::search_index::SearchIndex;
125
126#[cfg(not(target_family = "wasm"))]
127type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
128#[cfg(target_family = "wasm")]
129type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
130
131#[cfg(not(target_family = "wasm"))]
132type NotificationHandlerFn =
133 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
134#[cfg(target_family = "wasm")]
135type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
136
137/// Enum controlling if a loop running callbacks should continue or abort.
138///
139/// This is mainly used in the [`sync_with_callback`] method, the return value
140/// of the provided callback controls if the sync loop should be exited.
141///
142/// [`sync_with_callback`]: #method.sync_with_callback
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144pub enum LoopCtrl {
145 /// Continue running the loop.
146 Continue,
147 /// Break out of the loop.
148 Break,
149}
150
151/// Represents changes that can occur to a `Client`s `Session`.
152#[derive(Debug, Clone, PartialEq)]
153pub enum SessionChange {
154 /// The session's token is no longer valid.
155 UnknownToken {
156 /// Whether or not the session was soft logged out
157 soft_logout: bool,
158 },
159 /// The session's tokens have been refreshed.
160 TokensRefreshed,
161}
162
163/// Information about the server vendor obtained from the federation API.
164#[derive(Debug, Clone, PartialEq, Eq)]
165#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
166pub struct ServerVendorInfo {
167 /// The server name.
168 pub server_name: String,
169 /// The server version.
170 pub version: String,
171}
172
173/// An async/await enabled Matrix client.
174///
175/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
176#[derive(Clone)]
177pub struct Client {
178 pub(crate) inner: Arc<ClientInner>,
179}
180
181#[derive(Default)]
182pub(crate) struct ClientLocks {
183 /// Lock ensuring that only a single room may be marked as a DM at once.
184 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
185 /// explanation.
186 pub(crate) mark_as_dm_lock: Mutex<()>,
187
188 /// Lock ensuring that only a single secret store is getting opened at the
189 /// same time.
190 ///
191 /// This is important so we don't accidentally create multiple different new
192 /// default secret storage keys.
193 #[cfg(feature = "e2e-encryption")]
194 pub(crate) open_secret_store_lock: Mutex<()>,
195
196 /// Lock ensuring that we're only storing a single secret at a time.
197 ///
198 /// Take a look at the [`SecretStore::put_secret`] method for a more
199 /// detailed explanation.
200 ///
201 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
202 #[cfg(feature = "e2e-encryption")]
203 pub(crate) store_secret_lock: Mutex<()>,
204
205 /// Lock ensuring that only one method at a time might modify our backup.
206 #[cfg(feature = "e2e-encryption")]
207 pub(crate) backup_modify_lock: Mutex<()>,
208
209 /// Lock ensuring that we're going to attempt to upload backups for a single
210 /// requester.
211 #[cfg(feature = "e2e-encryption")]
212 pub(crate) backup_upload_lock: Mutex<()>,
213
214 /// Handler making sure we only have one group session sharing request in
215 /// flight per room.
216 #[cfg(feature = "e2e-encryption")]
217 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
218
219 /// Lock making sure we're only doing one key claim request at a time.
220 #[cfg(feature = "e2e-encryption")]
221 pub(crate) key_claim_lock: Mutex<()>,
222
223 /// Handler to ensure that only one members request is running at a time,
224 /// given a room.
225 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
226
227 /// Handler to ensure that only one encryption state request is running at a
228 /// time, given a room.
229 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
230
231 /// Deduplicating handler for sending read receipts. The string is an
232 /// internal implementation detail, see [`Self::send_single_receipt`].
233 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
234
235 #[cfg(feature = "e2e-encryption")]
236 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
237
238 /// Latest "generation" of data known by the crypto store.
239 ///
240 /// This is a counter that only increments, set in the database (and can
241 /// wrap). It's incremented whenever some process acquires a lock for the
242 /// first time. *This assumes the crypto store lock is being held, to
243 /// avoid data races on writing to this value in the store*.
244 ///
245 /// The current process will maintain this value in local memory and in the
246 /// DB over time. Observing a different value than the one read in
247 /// memory, when reading from the store indicates that somebody else has
248 /// written into the database under our feet.
249 ///
250 /// TODO: this should live in the `OlmMachine`, since it's information
251 /// related to the lock. As of today (2023-07-28), we blow up the entire
252 /// olm machine when there's a generation mismatch. So storing the
253 /// generation in the olm machine would make the client think there's
254 /// *always* a mismatch, and that's why we need to store the generation
255 /// outside the `OlmMachine`.
256 #[cfg(feature = "e2e-encryption")]
257 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
258}
259
260pub(crate) struct ClientInner {
261 /// All the data related to authentication and authorization.
262 pub(crate) auth_ctx: Arc<AuthCtx>,
263
264 /// The URL of the server.
265 ///
266 /// Not to be confused with the `Self::homeserver`. `server` is usually
267 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
268 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
269 /// homeserver (at the time of writing — 2024-08-28).
270 ///
271 /// This value is optional depending on how the `Client` has been built.
272 /// If it's been built from a homeserver URL directly, we don't know the
273 /// server. However, if the `Client` has been built from a server URL or
274 /// name, then the homeserver has been discovered, and we know both.
275 server: Option<Url>,
276
277 /// The URL of the homeserver to connect to.
278 ///
279 /// This is the URL for the client-server Matrix API.
280 homeserver: StdRwLock<Url>,
281
282 /// The sliding sync version.
283 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
284
285 /// The underlying HTTP client.
286 pub(crate) http_client: HttpClient,
287
288 /// User session data.
289 pub(super) base_client: BaseClient,
290
291 /// Collection of in-memory caches for the [`Client`].
292 pub(crate) caches: ClientCaches,
293
294 /// Collection of locks individual client methods might want to use, either
295 /// to ensure that only a single call to a method happens at once or to
296 /// deduplicate multiple calls to a method.
297 pub(crate) locks: ClientLocks,
298
299 /// The cross-process store locks holder name.
300 ///
301 /// The SDK provides cross-process store locks (see
302 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
303 /// `holder_name` is the value used for all cross-process store locks
304 /// used by this `Client`.
305 ///
306 /// If multiple `Client`s are running in different processes, this
307 /// value MUST be different for each `Client`.
308 cross_process_store_locks_holder_name: String,
309
310 /// A mapping of the times at which the current user sent typing notices,
311 /// keyed by room.
312 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
313
314 /// Event handlers. See `add_event_handler`.
315 pub(crate) event_handlers: EventHandlerStore,
316
317 /// Notification handlers. See `register_notification_handler`.
318 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
319
320 /// The sender-side of channels used to receive room updates.
321 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
322
323 /// The sender-side of a channel used to observe all the room updates of a
324 /// sync response.
325 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
326
327 /// Whether the client should update its homeserver URL with the discovery
328 /// information present in the login response.
329 respect_login_well_known: bool,
330
331 /// An event that can be listened on to wait for a successful sync. The
332 /// event will only be fired if a sync loop is running. Can be used for
333 /// synchronization, e.g. if we send out a request to create a room, we can
334 /// wait for the sync to get the data to fetch a room object from the state
335 /// store.
336 pub(crate) sync_beat: event_listener::Event,
337
338 /// A central cache for events, inactive first.
339 ///
340 /// It becomes active when [`EventCache::subscribe`] is called.
341 pub(crate) event_cache: OnceCell<EventCache>,
342
343 /// End-to-end encryption related state.
344 #[cfg(feature = "e2e-encryption")]
345 pub(crate) e2ee: EncryptionData,
346
347 /// The verification state of our own device.
348 #[cfg(feature = "e2e-encryption")]
349 pub(crate) verification_state: SharedObservable<VerificationState>,
350
351 /// Whether to enable the experimental support for sending and receiving
352 /// encrypted room history on invite, per [MSC4268].
353 ///
354 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
355 #[cfg(feature = "e2e-encryption")]
356 pub(crate) enable_share_history_on_invite: bool,
357
358 /// Data related to the [`SendQueue`].
359 ///
360 /// [`SendQueue`]: crate::send_queue::SendQueue
361 pub(crate) send_queue_data: Arc<SendQueueData>,
362
363 /// The `max_upload_size` value of the homeserver, it contains the max
364 /// request size you can send.
365 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
366
367 /// The entry point to get the [`LatestEvent`] of rooms and threads.
368 ///
369 /// [`LatestEvent`]: crate::latest_event::LatestEvent
370 latest_events: OnceCell<LatestEvents>,
371
372 /// Service handling the catching up of thread subscriptions in the
373 /// background.
374 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
375
376 #[cfg(feature = "experimental-search")]
377 /// Handler for [`RoomIndex`]'s of each room
378 search_index: SearchIndex,
379}
380
381impl ClientInner {
382 /// Create a new `ClientInner`.
383 ///
384 /// All the fields passed as parameters here are those that must be cloned
385 /// upon instantiation of a sub-client, e.g. a client specialized for
386 /// notifications.
387 #[allow(clippy::too_many_arguments)]
388 async fn new(
389 auth_ctx: Arc<AuthCtx>,
390 server: Option<Url>,
391 homeserver: Url,
392 sliding_sync_version: SlidingSyncVersion,
393 http_client: HttpClient,
394 base_client: BaseClient,
395 server_info: ClientServerInfo,
396 respect_login_well_known: bool,
397 event_cache: OnceCell<EventCache>,
398 send_queue: Arc<SendQueueData>,
399 latest_events: OnceCell<LatestEvents>,
400 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
401 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
402 cross_process_store_locks_holder_name: String,
403 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
404 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
405 ) -> Arc<Self> {
406 let caches = ClientCaches {
407 server_info: server_info.into(),
408 server_metadata: Mutex::new(TtlCache::new()),
409 };
410
411 let client = Self {
412 server,
413 homeserver: StdRwLock::new(homeserver),
414 auth_ctx,
415 sliding_sync_version: StdRwLock::new(sliding_sync_version),
416 http_client,
417 base_client,
418 caches,
419 locks: Default::default(),
420 cross_process_store_locks_holder_name,
421 typing_notice_times: Default::default(),
422 event_handlers: Default::default(),
423 notification_handlers: Default::default(),
424 room_update_channels: Default::default(),
425 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
426 // ballast for all observers to catch up.
427 room_updates_sender: broadcast::Sender::new(32),
428 respect_login_well_known,
429 sync_beat: event_listener::Event::new(),
430 event_cache,
431 send_queue_data: send_queue,
432 latest_events,
433 #[cfg(feature = "e2e-encryption")]
434 e2ee: EncryptionData::new(encryption_settings),
435 #[cfg(feature = "e2e-encryption")]
436 verification_state: SharedObservable::new(VerificationState::Unknown),
437 #[cfg(feature = "e2e-encryption")]
438 enable_share_history_on_invite,
439 server_max_upload_size: Mutex::new(OnceCell::new()),
440 #[cfg(feature = "experimental-search")]
441 search_index: search_index_handler,
442 thread_subscription_catchup,
443 };
444
445 #[allow(clippy::let_and_return)]
446 let client = Arc::new(client);
447
448 #[cfg(feature = "e2e-encryption")]
449 client.e2ee.initialize_tasks(&client);
450
451 let _ = client
452 .event_cache
453 .get_or_init(|| async {
454 EventCache::new(
455 WeakClient::from_inner(&client),
456 client.base_client.event_cache_store().clone(),
457 )
458 })
459 .await;
460
461 let _ = client
462 .thread_subscription_catchup
463 .get_or_init(|| async {
464 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
465 })
466 .await;
467
468 client
469 }
470}
471
472#[cfg(not(tarpaulin_include))]
473impl Debug for Client {
474 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
475 write!(fmt, "Client")
476 }
477}
478
479impl Client {
480 /// Create a new [`Client`] that will use the given homeserver.
481 ///
482 /// # Arguments
483 ///
484 /// * `homeserver_url` - The homeserver that the client should connect to.
485 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
486 Self::builder().homeserver_url(homeserver_url).build().await
487 }
488
489 /// Returns a subscriber that publishes an event every time the ignore user
490 /// list changes.
491 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
492 self.inner.base_client.subscribe_to_ignore_user_list_changes()
493 }
494
495 /// Create a new [`ClientBuilder`].
496 pub fn builder() -> ClientBuilder {
497 ClientBuilder::new()
498 }
499
500 pub(crate) fn base_client(&self) -> &BaseClient {
501 &self.inner.base_client
502 }
503
504 /// The underlying HTTP client.
505 pub fn http_client(&self) -> &reqwest::Client {
506 &self.inner.http_client.inner
507 }
508
509 pub(crate) fn locks(&self) -> &ClientLocks {
510 &self.inner.locks
511 }
512
513 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
514 &self.inner.auth_ctx
515 }
516
517 /// The cross-process store locks holder name.
518 ///
519 /// The SDK provides cross-process store locks (see
520 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
521 /// `holder_name` is the value used for all cross-process store locks
522 /// used by this `Client`.
523 pub fn cross_process_store_locks_holder_name(&self) -> &str {
524 &self.inner.cross_process_store_locks_holder_name
525 }
526
527 /// Change the homeserver URL used by this client.
528 ///
529 /// # Arguments
530 ///
531 /// * `homeserver_url` - The new URL to use.
532 fn set_homeserver(&self, homeserver_url: Url) {
533 *self.inner.homeserver.write().unwrap() = homeserver_url;
534 }
535
536 /// Change to a different homeserver and re-resolve well-known.
537 #[cfg(feature = "e2e-encryption")]
538 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
539 &self,
540 homeserver_url: Url,
541 ) -> Result<()> {
542 self.set_homeserver(homeserver_url);
543 self.reset_server_info().await?;
544 if let ServerInfo { well_known: Some(well_known), .. } =
545 self.load_or_fetch_server_info().await?
546 {
547 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
548 }
549 Ok(())
550 }
551
552 /// Get the capabilities of the homeserver.
553 ///
554 /// This method should be used to check what features are supported by the
555 /// homeserver.
556 ///
557 /// # Examples
558 ///
559 /// ```no_run
560 /// # use matrix_sdk::Client;
561 /// # use url::Url;
562 /// # async {
563 /// # let homeserver = Url::parse("http://example.com")?;
564 /// let client = Client::new(homeserver).await?;
565 ///
566 /// let capabilities = client.get_capabilities().await?;
567 ///
568 /// if capabilities.change_password.enabled {
569 /// // Change password
570 /// }
571 /// # anyhow::Ok(()) };
572 /// ```
573 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
574 let res = self.send(get_capabilities::v3::Request::new()).await?;
575 Ok(res.capabilities)
576 }
577
578 /// Get the server vendor information from the federation API.
579 ///
580 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
581 /// both the server's software name and version.
582 ///
583 /// # Examples
584 ///
585 /// ```no_run
586 /// # use matrix_sdk::Client;
587 /// # use url::Url;
588 /// # async {
589 /// # let homeserver = Url::parse("http://example.com")?;
590 /// let client = Client::new(homeserver).await?;
591 ///
592 /// let server_info = client.server_vendor_info(None).await?;
593 /// println!(
594 /// "Server: {}, Version: {}",
595 /// server_info.server_name, server_info.version
596 /// );
597 /// # anyhow::Ok(()) };
598 /// ```
599 pub async fn server_vendor_info(
600 &self,
601 request_config: Option<RequestConfig>,
602 ) -> HttpResult<ServerVendorInfo> {
603 let res = self
604 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
605 .await?;
606
607 // Extract server info, using defaults if fields are missing.
608 let server = res.server.unwrap_or_default();
609 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
610 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
611
612 Ok(ServerVendorInfo { server_name: server_name_str, version })
613 }
614
615 /// Get a copy of the default request config.
616 ///
617 /// The default request config is what's used when sending requests if no
618 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
619 /// function with such a parameter.
620 ///
621 /// If the default request config was not customized through
622 /// [`ClientBuilder`] when creating this `Client`, the returned value will
623 /// be equivalent to [`RequestConfig::default()`].
624 pub fn request_config(&self) -> RequestConfig {
625 self.inner.http_client.request_config
626 }
627
628 /// Check whether the client has been activated.
629 ///
630 /// A client is considered active when:
631 ///
632 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
633 /// is logged in,
634 /// 2. Has loaded cached data from storage,
635 /// 3. If encryption is enabled, it also initialized or restored its
636 /// `OlmMachine`.
637 pub fn is_active(&self) -> bool {
638 self.inner.base_client.is_active()
639 }
640
641 /// The server used by the client.
642 ///
643 /// See `Self::server` to learn more.
644 pub fn server(&self) -> Option<&Url> {
645 self.inner.server.as_ref()
646 }
647
648 /// The homeserver of the client.
649 pub fn homeserver(&self) -> Url {
650 self.inner.homeserver.read().unwrap().clone()
651 }
652
653 /// Get the sliding sync version.
654 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
655 self.inner.sliding_sync_version.read().unwrap().clone()
656 }
657
658 /// Override the sliding sync version.
659 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
660 let mut lock = self.inner.sliding_sync_version.write().unwrap();
661 *lock = version;
662 }
663
664 /// Get the Matrix user session meta information.
665 ///
666 /// If the client is currently logged in, this will return a
667 /// [`SessionMeta`] object which contains the user ID and device ID.
668 /// Otherwise it returns `None`.
669 pub fn session_meta(&self) -> Option<&SessionMeta> {
670 self.base_client().session_meta()
671 }
672
673 /// Returns a receiver that gets events for each room info update. To watch
674 /// for new events, use `receiver.resubscribe()`.
675 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
676 self.base_client().room_info_notable_update_receiver()
677 }
678
679 /// Performs a search for users.
680 /// The search is performed case-insensitively on user IDs and display names
681 ///
682 /// # Arguments
683 ///
684 /// * `search_term` - The search term for the search
685 /// * `limit` - The maximum number of results to return. Defaults to 10.
686 ///
687 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
688 pub async fn search_users(
689 &self,
690 search_term: &str,
691 limit: u64,
692 ) -> HttpResult<search_users::v3::Response> {
693 let mut request = search_users::v3::Request::new(search_term.to_owned());
694
695 if let Some(limit) = UInt::new(limit) {
696 request.limit = limit;
697 }
698
699 self.send(request).await
700 }
701
702 /// Get the user id of the current owner of the client.
703 pub fn user_id(&self) -> Option<&UserId> {
704 self.session_meta().map(|s| s.user_id.as_ref())
705 }
706
707 /// Get the device ID that identifies the current session.
708 pub fn device_id(&self) -> Option<&DeviceId> {
709 self.session_meta().map(|s| s.device_id.as_ref())
710 }
711
712 /// Get the current access token for this session.
713 ///
714 /// Will be `None` if the client has not been logged in.
715 pub fn access_token(&self) -> Option<String> {
716 self.auth_ctx().access_token()
717 }
718
719 /// Get the current tokens for this session.
720 ///
721 /// To be notified of changes in the session tokens, use
722 /// [`Client::subscribe_to_session_changes()`] or
723 /// [`Client::set_session_callbacks()`].
724 ///
725 /// Returns `None` if the client has not been logged in.
726 pub fn session_tokens(&self) -> Option<SessionTokens> {
727 self.auth_ctx().session_tokens()
728 }
729
730 /// Access the authentication API used to log in this client.
731 ///
732 /// Will be `None` if the client has not been logged in.
733 pub fn auth_api(&self) -> Option<AuthApi> {
734 match self.auth_ctx().auth_data.get()? {
735 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
736 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
737 }
738 }
739
740 /// Get the whole session info of this client.
741 ///
742 /// Will be `None` if the client has not been logged in.
743 ///
744 /// Can be used with [`Client::restore_session`] to restore a previously
745 /// logged-in session.
746 pub fn session(&self) -> Option<AuthSession> {
747 match self.auth_api()? {
748 AuthApi::Matrix(api) => api.session().map(Into::into),
749 AuthApi::OAuth(api) => api.full_session().map(Into::into),
750 }
751 }
752
753 /// Get a reference to the state store.
754 pub fn state_store(&self) -> &DynStateStore {
755 self.base_client().state_store()
756 }
757
758 /// Get a reference to the event cache store.
759 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
760 self.base_client().event_cache_store()
761 }
762
763 /// Get a reference to the media store.
764 pub fn media_store(&self) -> &MediaStoreLock {
765 self.base_client().media_store()
766 }
767
768 /// Access the native Matrix authentication API with this client.
769 pub fn matrix_auth(&self) -> MatrixAuth {
770 MatrixAuth::new(self.clone())
771 }
772
773 /// Get the account of the current owner of the client.
774 pub fn account(&self) -> Account {
775 Account::new(self.clone())
776 }
777
778 /// Get the encryption manager of the client.
779 #[cfg(feature = "e2e-encryption")]
780 pub fn encryption(&self) -> Encryption {
781 Encryption::new(self.clone())
782 }
783
784 /// Get the media manager of the client.
785 pub fn media(&self) -> Media {
786 Media::new(self.clone())
787 }
788
789 /// Get the pusher manager of the client.
790 pub fn pusher(&self) -> Pusher {
791 Pusher::new(self.clone())
792 }
793
794 /// Access the OAuth 2.0 API of the client.
795 pub fn oauth(&self) -> OAuth {
796 OAuth::new(self.clone())
797 }
798
799 /// Register a handler for a specific event type.
800 ///
801 /// The handler is a function or closure with one or more arguments. The
802 /// first argument is the event itself. All additional arguments are
803 /// "context" arguments: They have to implement [`EventHandlerContext`].
804 /// This trait is named that way because most of the types implementing it
805 /// give additional context about an event: The room it was in, its raw form
806 /// and other similar things. As two exceptions to this,
807 /// [`Client`] and [`EventHandlerHandle`] also implement the
808 /// `EventHandlerContext` trait so you don't have to clone your client
809 /// into the event handler manually and a handler can decide to remove
810 /// itself.
811 ///
812 /// Some context arguments are not universally applicable. A context
813 /// argument that isn't available for the given event type will result in
814 /// the event handler being skipped and an error being logged. The following
815 /// context argument types are only available for a subset of event types:
816 ///
817 /// * [`Room`] is only available for room-specific events, i.e. not for
818 /// events like global account data events or presence events.
819 ///
820 /// You can provide custom context via
821 /// [`add_event_handler_context`](Client::add_event_handler_context) and
822 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
823 /// into the event handler.
824 ///
825 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
826 ///
827 /// # Examples
828 ///
829 /// ```no_run
830 /// use matrix_sdk::{
831 /// deserialized_responses::EncryptionInfo,
832 /// event_handler::Ctx,
833 /// ruma::{
834 /// events::{
835 /// macros::EventContent,
836 /// push_rules::PushRulesEvent,
837 /// room::{
838 /// message::SyncRoomMessageEvent,
839 /// topic::SyncRoomTopicEvent,
840 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
841 /// },
842 /// },
843 /// push::Action,
844 /// Int, MilliSecondsSinceUnixEpoch,
845 /// },
846 /// Client, Room,
847 /// };
848 /// use serde::{Deserialize, Serialize};
849 ///
850 /// # async fn example(client: Client) {
851 /// client.add_event_handler(
852 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
853 /// // Common usage: Room event plus room and client.
854 /// },
855 /// );
856 /// client.add_event_handler(
857 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
858 /// async move {
859 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
860 /// // unencrypted events and events that were decrypted by the SDK.
861 /// }
862 /// },
863 /// );
864 /// client.add_event_handler(
865 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
866 /// async move {
867 /// // A `Vec<Action>` parameter allows you to know which push actions
868 /// // are applicable for an event. For example, an event with
869 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
870 /// // in the timeline.
871 /// }
872 /// },
873 /// );
874 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
875 /// // You can omit any or all arguments after the first.
876 /// });
877 ///
878 /// // Registering a temporary event handler:
879 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
880 /// /* Event handler */
881 /// });
882 /// client.remove_event_handler(handle);
883 ///
884 /// // Registering custom event handler context:
885 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
886 /// struct MyContext {
887 /// number: usize,
888 /// }
889 /// client.add_event_handler_context(MyContext { number: 5 });
890 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
891 /// // Use the context
892 /// });
893 ///
894 /// // This will handle membership events in joined rooms. Invites are special, see below.
895 /// client.add_event_handler(
896 /// |ev: SyncRoomMemberEvent| async move {},
897 /// );
898 ///
899 /// // To handle state events in invited rooms (including invite membership events),
900 /// // `StrippedRoomMemberEvent` should be used.
901 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
902 /// client.add_event_handler(
903 /// |ev: StrippedRoomMemberEvent| async move {},
904 /// );
905 ///
906 /// // Custom events work exactly the same way, you just need to declare
907 /// // the content struct and use the EventContent derive macro on it.
908 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
909 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
910 /// struct TokenEventContent {
911 /// token: String,
912 /// #[serde(rename = "exp")]
913 /// expires_at: MilliSecondsSinceUnixEpoch,
914 /// }
915 ///
916 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
917 /// todo!("Display the token");
918 /// });
919 ///
920 /// // Event handler closures can also capture local variables.
921 /// // Make sure they are cheap to clone though, because they will be cloned
922 /// // every time the closure is called.
923 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
924 ///
925 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
926 /// println!("Calling the handler with identifier {data}");
927 /// });
928 /// # }
929 /// ```
930 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
931 where
932 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
933 H: EventHandler<Ev, Ctx>,
934 {
935 self.add_event_handler_impl(handler, None)
936 }
937
938 /// Register a handler for a specific room, and event type.
939 ///
940 /// This method works the same way as
941 /// [`add_event_handler`][Self::add_event_handler], except that the handler
942 /// will only be called for events in the room with the specified ID. See
943 /// that method for more details on event handler functions.
944 ///
945 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
946 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
947 /// your use case.
948 pub fn add_room_event_handler<Ev, Ctx, H>(
949 &self,
950 room_id: &RoomId,
951 handler: H,
952 ) -> EventHandlerHandle
953 where
954 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
955 H: EventHandler<Ev, Ctx>,
956 {
957 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
958 }
959
960 /// Observe a specific event type.
961 ///
962 /// `Ev` represents the kind of event that will be observed. `Ctx`
963 /// represents the context that will come with the event. It relies on the
964 /// same mechanism as [`Client::add_event_handler`]. The main difference is
965 /// that it returns an [`ObservableEventHandler`] and doesn't require a
966 /// user-defined closure. It is possible to subscribe to the
967 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
968 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
969 /// Ctx)`.
970 ///
971 /// Be careful that only the most recent value can be observed. Subscribers
972 /// are notified when a new value is sent, but there is no guarantee
973 /// that they will see all values.
974 ///
975 /// # Example
976 ///
977 /// Let's see a classical usage:
978 ///
979 /// ```
980 /// use futures_util::StreamExt as _;
981 /// use matrix_sdk::{
982 /// Client, Room,
983 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
984 /// };
985 ///
986 /// # async fn example(client: Client) -> Option<()> {
987 /// let observer =
988 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
989 ///
990 /// let mut subscriber = observer.subscribe();
991 ///
992 /// let (event, (room, push_actions)) = subscriber.next().await?;
993 /// # Some(())
994 /// # }
995 /// ```
996 ///
997 /// Now let's see how to get several contexts that can be useful for you:
998 ///
999 /// ```
1000 /// use matrix_sdk::{
1001 /// Client, Room,
1002 /// deserialized_responses::EncryptionInfo,
1003 /// ruma::{
1004 /// events::room::{
1005 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1006 /// },
1007 /// push::Action,
1008 /// },
1009 /// };
1010 ///
1011 /// # async fn example(client: Client) {
1012 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1013 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1014 ///
1015 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1016 /// // to distinguish between unencrypted events and events that were decrypted
1017 /// // by the SDK.
1018 /// let _ = client
1019 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1020 /// );
1021 ///
1022 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1023 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1024 /// // should be highlighted in the timeline.
1025 /// let _ =
1026 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1027 ///
1028 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1029 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1030 /// # }
1031 /// ```
1032 ///
1033 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1034 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1035 where
1036 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1037 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1038 {
1039 self.observe_room_events_impl(None)
1040 }
1041
1042 /// Observe a specific room, and event type.
1043 ///
1044 /// This method works the same way as [`Client::observe_events`], except
1045 /// that the observability will only be applied for events in the room with
1046 /// the specified ID. See that method for more details.
1047 ///
1048 /// Be careful that only the most recent value can be observed. Subscribers
1049 /// are notified when a new value is sent, but there is no guarantee
1050 /// that they will see all values.
1051 pub fn observe_room_events<Ev, Ctx>(
1052 &self,
1053 room_id: &RoomId,
1054 ) -> ObservableEventHandler<(Ev, Ctx)>
1055 where
1056 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1057 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1058 {
1059 self.observe_room_events_impl(Some(room_id.to_owned()))
1060 }
1061
1062 /// Shared implementation for `Client::observe_events` and
1063 /// `Client::observe_room_events`.
1064 fn observe_room_events_impl<Ev, Ctx>(
1065 &self,
1066 room_id: Option<OwnedRoomId>,
1067 ) -> ObservableEventHandler<(Ev, Ctx)>
1068 where
1069 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1070 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1071 {
1072 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1073 // new value.
1074 let shared_observable = SharedObservable::new(None);
1075
1076 ObservableEventHandler::new(
1077 shared_observable.clone(),
1078 self.event_handler_drop_guard(self.add_event_handler_impl(
1079 move |event: Ev, context: Ctx| {
1080 shared_observable.set(Some((event, context)));
1081
1082 ready(())
1083 },
1084 room_id,
1085 )),
1086 )
1087 }
1088
1089 /// Remove the event handler associated with the handle.
1090 ///
1091 /// Note that you **must not** call `remove_event_handler` from the
1092 /// non-async part of an event handler, that is:
1093 ///
1094 /// ```ignore
1095 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1096 /// // ⚠ this will cause a deadlock ⚠
1097 /// client.remove_event_handler(handle);
1098 ///
1099 /// async move {
1100 /// // removing the event handler here is fine
1101 /// client.remove_event_handler(handle);
1102 /// }
1103 /// })
1104 /// ```
1105 ///
1106 /// Note also that handlers that remove themselves will still execute with
1107 /// events received in the same sync cycle.
1108 ///
1109 /// # Arguments
1110 ///
1111 /// `handle` - The [`EventHandlerHandle`] that is returned when
1112 /// registering the event handler with [`Client::add_event_handler`].
1113 ///
1114 /// # Examples
1115 ///
1116 /// ```no_run
1117 /// # use url::Url;
1118 /// # use tokio::sync::mpsc;
1119 /// #
1120 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1121 /// #
1122 /// use matrix_sdk::{
1123 /// Client, event_handler::EventHandlerHandle,
1124 /// ruma::events::room::member::SyncRoomMemberEvent,
1125 /// };
1126 /// #
1127 /// # futures_executor::block_on(async {
1128 /// # let client = matrix_sdk::Client::builder()
1129 /// # .homeserver_url(homeserver)
1130 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1131 /// # .build()
1132 /// # .await
1133 /// # .unwrap();
1134 ///
1135 /// client.add_event_handler(
1136 /// |ev: SyncRoomMemberEvent,
1137 /// client: Client,
1138 /// handle: EventHandlerHandle| async move {
1139 /// // Common usage: Check arriving Event is the expected one
1140 /// println!("Expected RoomMemberEvent received!");
1141 /// client.remove_event_handler(handle);
1142 /// },
1143 /// );
1144 /// # });
1145 /// ```
1146 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1147 self.inner.event_handlers.remove(handle);
1148 }
1149
1150 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1151 /// the given handle.
1152 ///
1153 /// When the returned value is dropped, the event handler will be removed.
1154 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1155 EventHandlerDropGuard::new(handle, self.clone())
1156 }
1157
1158 /// Add an arbitrary value for use as event handler context.
1159 ///
1160 /// The value can be obtained in an event handler by adding an argument of
1161 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1162 ///
1163 /// If a value of the same type has been added before, it will be
1164 /// overwritten.
1165 ///
1166 /// # Examples
1167 ///
1168 /// ```no_run
1169 /// use matrix_sdk::{
1170 /// Room, event_handler::Ctx,
1171 /// ruma::events::room::message::SyncRoomMessageEvent,
1172 /// };
1173 /// # #[derive(Clone)]
1174 /// # struct SomeType;
1175 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1176 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1177 /// # futures_executor::block_on(async {
1178 /// # let client = matrix_sdk::Client::builder()
1179 /// # .homeserver_url(homeserver)
1180 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1181 /// # .build()
1182 /// # .await
1183 /// # .unwrap();
1184 ///
1185 /// // Handle used to send messages to the UI part of the app
1186 /// let my_gui_handle: SomeType = obtain_gui_handle();
1187 ///
1188 /// client.add_event_handler_context(my_gui_handle.clone());
1189 /// client.add_event_handler(
1190 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1191 /// async move {
1192 /// // gui_handle.send(DisplayMessage { message: ev });
1193 /// }
1194 /// },
1195 /// );
1196 /// # });
1197 /// ```
1198 pub fn add_event_handler_context<T>(&self, ctx: T)
1199 where
1200 T: Clone + Send + Sync + 'static,
1201 {
1202 self.inner.event_handlers.add_context(ctx);
1203 }
1204
1205 /// Register a handler for a notification.
1206 ///
1207 /// Similar to [`Client::add_event_handler`], but only allows functions
1208 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1209 /// [`Client`] for now.
1210 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1211 where
1212 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1213 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1214 {
1215 self.inner.notification_handlers.write().await.push(Box::new(
1216 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1217 ));
1218
1219 self
1220 }
1221
1222 /// Subscribe to all updates for the room with the given ID.
1223 ///
1224 /// The returned receiver will receive a new message for each sync response
1225 /// that contains updates for that room.
1226 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1227 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1228 btree_map::Entry::Vacant(entry) => {
1229 let (tx, rx) = broadcast::channel(8);
1230 entry.insert(tx);
1231 rx
1232 }
1233 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1234 }
1235 }
1236
1237 /// Subscribe to all updates to all rooms, whenever any has been received in
1238 /// a sync response.
1239 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1240 self.inner.room_updates_sender.subscribe()
1241 }
1242
1243 pub(crate) async fn notification_handlers(
1244 &self,
1245 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1246 self.inner.notification_handlers.read().await
1247 }
1248
1249 /// Get all the rooms the client knows about.
1250 ///
1251 /// This will return the list of joined, invited, and left rooms.
1252 pub fn rooms(&self) -> Vec<Room> {
1253 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1254 }
1255
1256 /// Get all the rooms the client knows about, filtered by room state.
1257 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1258 self.base_client()
1259 .rooms_filtered(filter)
1260 .into_iter()
1261 .map(|room| Room::new(self.clone(), room))
1262 .collect()
1263 }
1264
1265 /// Get a stream of all the rooms, in addition to the existing rooms.
1266 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1267 let (rooms, stream) = self.base_client().rooms_stream();
1268
1269 let map_room = |room| Room::new(self.clone(), room);
1270
1271 (
1272 rooms.into_iter().map(map_room).collect(),
1273 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1274 )
1275 }
1276
1277 /// Returns the joined rooms this client knows about.
1278 pub fn joined_rooms(&self) -> Vec<Room> {
1279 self.rooms_filtered(RoomStateFilter::JOINED)
1280 }
1281
1282 /// Returns the invited rooms this client knows about.
1283 pub fn invited_rooms(&self) -> Vec<Room> {
1284 self.rooms_filtered(RoomStateFilter::INVITED)
1285 }
1286
1287 /// Returns the left rooms this client knows about.
1288 pub fn left_rooms(&self) -> Vec<Room> {
1289 self.rooms_filtered(RoomStateFilter::LEFT)
1290 }
1291
1292 /// Returns the joined space rooms this client knows about.
1293 pub fn joined_space_rooms(&self) -> Vec<Room> {
1294 self.base_client()
1295 .rooms_filtered(RoomStateFilter::JOINED)
1296 .into_iter()
1297 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1298 .collect()
1299 }
1300
1301 /// Get a room with the given room id.
1302 ///
1303 /// # Arguments
1304 ///
1305 /// `room_id` - The unique id of the room that should be fetched.
1306 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1307 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1308 }
1309
1310 /// Gets the preview of a room, whether the current user has joined it or
1311 /// not.
1312 pub async fn get_room_preview(
1313 &self,
1314 room_or_alias_id: &RoomOrAliasId,
1315 via: Vec<OwnedServerName>,
1316 ) -> Result<RoomPreview> {
1317 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1318 Ok(room_id) => room_id.to_owned(),
1319 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1320 };
1321
1322 if let Some(room) = self.get_room(&room_id) {
1323 // The cached data can only be trusted if the room state is joined or
1324 // banned: for invite and knock rooms, no updates will be received
1325 // for the rooms after the invite/knock action took place so we may
1326 // have very out to date data for important fields such as
1327 // `join_rule`. For left rooms, the homeserver should return the latest info.
1328 match room.state() {
1329 RoomState::Joined | RoomState::Banned => {
1330 return Ok(RoomPreview::from_known_room(&room).await);
1331 }
1332 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1333 }
1334 }
1335
1336 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1337 }
1338
1339 /// Resolve a room alias to a room id and a list of servers which know
1340 /// about it.
1341 ///
1342 /// # Arguments
1343 ///
1344 /// `room_alias` - The room alias to be resolved.
1345 pub async fn resolve_room_alias(
1346 &self,
1347 room_alias: &RoomAliasId,
1348 ) -> HttpResult<get_alias::v3::Response> {
1349 let request = get_alias::v3::Request::new(room_alias.to_owned());
1350 self.send(request).await
1351 }
1352
1353 /// Checks if a room alias is not in use yet.
1354 ///
1355 /// Returns:
1356 /// - `Ok(true)` if the room alias is available.
1357 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1358 /// status code).
1359 /// - An `Err` otherwise.
1360 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1361 match self.resolve_room_alias(alias).await {
1362 // The room alias was resolved, so it's already in use.
1363 Ok(_) => Ok(false),
1364 Err(error) => {
1365 match error.client_api_error_kind() {
1366 // The room alias wasn't found, so it's available.
1367 Some(ErrorKind::NotFound) => Ok(true),
1368 _ => Err(error),
1369 }
1370 }
1371 }
1372 }
1373
1374 /// Adds a new room alias associated with a room to the room directory.
1375 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1376 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1377 self.send(request).await?;
1378 Ok(())
1379 }
1380
1381 /// Removes a room alias from the room directory.
1382 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1383 let request = delete_alias::v3::Request::new(alias.to_owned());
1384 self.send(request).await?;
1385 Ok(())
1386 }
1387
1388 /// Update the homeserver from the login response well-known if needed.
1389 ///
1390 /// # Arguments
1391 ///
1392 /// * `login_well_known` - The `well_known` field from a successful login
1393 /// response.
1394 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1395 if self.inner.respect_login_well_known
1396 && let Some(well_known) = login_well_known
1397 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1398 {
1399 self.set_homeserver(homeserver);
1400 }
1401 }
1402
1403 /// Similar to [`Client::restore_session_with`], with
1404 /// [`RoomLoadSettings::default()`].
1405 ///
1406 /// # Panics
1407 ///
1408 /// Panics if a session was already restored or logged in.
1409 #[instrument(skip_all)]
1410 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1411 self.restore_session_with(session, RoomLoadSettings::default()).await
1412 }
1413
1414 /// Restore a session previously logged-in using one of the available
1415 /// authentication APIs. The number of rooms to restore is controlled by
1416 /// [`RoomLoadSettings`].
1417 ///
1418 /// See the documentation of the corresponding authentication API's
1419 /// `restore_session` method for more information.
1420 ///
1421 /// # Panics
1422 ///
1423 /// Panics if a session was already restored or logged in.
1424 #[instrument(skip_all)]
1425 pub async fn restore_session_with(
1426 &self,
1427 session: impl Into<AuthSession>,
1428 room_load_settings: RoomLoadSettings,
1429 ) -> Result<()> {
1430 let session = session.into();
1431 match session {
1432 AuthSession::Matrix(session) => {
1433 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1434 }
1435 AuthSession::OAuth(session) => {
1436 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1437 }
1438 }
1439 }
1440
1441 /// Refresh the access token using the authentication API used to log into
1442 /// this session.
1443 ///
1444 /// See the documentation of the authentication API's `refresh_access_token`
1445 /// method for more information.
1446 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1447 let Some(auth_api) = self.auth_api() else {
1448 return Err(RefreshTokenError::RefreshTokenRequired);
1449 };
1450
1451 match auth_api {
1452 AuthApi::Matrix(api) => {
1453 trace!("Token refresh: Using the homeserver.");
1454 Box::pin(api.refresh_access_token()).await?;
1455 }
1456 AuthApi::OAuth(api) => {
1457 trace!("Token refresh: Using OAuth 2.0.");
1458 Box::pin(api.refresh_access_token()).await?;
1459 }
1460 }
1461
1462 Ok(())
1463 }
1464
1465 /// Log out the current session using the proper authentication API.
1466 ///
1467 /// # Errors
1468 ///
1469 /// Returns an error if the session is not authenticated or if an error
1470 /// occurred while making the request to the server.
1471 pub async fn logout(&self) -> Result<(), Error> {
1472 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1473 match auth_api {
1474 AuthApi::Matrix(matrix_auth) => {
1475 matrix_auth.logout().await?;
1476 Ok(())
1477 }
1478 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1479 }
1480 }
1481
1482 /// Get or upload a sync filter.
1483 ///
1484 /// This method will either get a filter ID from the store or upload the
1485 /// filter definition to the homeserver and return the new filter ID.
1486 ///
1487 /// # Arguments
1488 ///
1489 /// * `filter_name` - The unique name of the filter, this name will be used
1490 /// locally to store and identify the filter ID returned by the server.
1491 ///
1492 /// * `definition` - The filter definition that should be uploaded to the
1493 /// server if no filter ID can be found in the store.
1494 ///
1495 /// # Examples
1496 ///
1497 /// ```no_run
1498 /// # use matrix_sdk::{
1499 /// # Client, config::SyncSettings,
1500 /// # ruma::api::client::{
1501 /// # filter::{
1502 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1503 /// # },
1504 /// # sync::sync_events::v3::Filter,
1505 /// # }
1506 /// # };
1507 /// # use url::Url;
1508 /// # async {
1509 /// # let homeserver = Url::parse("http://example.com").unwrap();
1510 /// # let client = Client::new(homeserver).await.unwrap();
1511 /// let mut filter = FilterDefinition::default();
1512 ///
1513 /// // Let's enable member lazy loading.
1514 /// filter.room.state.lazy_load_options =
1515 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1516 ///
1517 /// let filter_id = client
1518 /// .get_or_upload_filter("sync", filter)
1519 /// .await
1520 /// .unwrap();
1521 ///
1522 /// let sync_settings = SyncSettings::new()
1523 /// .filter(Filter::FilterId(filter_id));
1524 ///
1525 /// let response = client.sync_once(sync_settings).await.unwrap();
1526 /// # };
1527 #[instrument(skip(self, definition))]
1528 pub async fn get_or_upload_filter(
1529 &self,
1530 filter_name: &str,
1531 definition: FilterDefinition,
1532 ) -> Result<String> {
1533 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1534 debug!("Found filter locally");
1535 Ok(filter)
1536 } else {
1537 debug!("Didn't find filter locally");
1538 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1539 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1540 let response = self.send(request).await?;
1541
1542 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1543
1544 Ok(response.filter_id)
1545 }
1546 }
1547
1548 /// Prepare to join a room by ID, by getting the current details about it
1549 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1550 let room = self.get_room(room_id)?;
1551
1552 let inviter = match room.invite_details().await {
1553 Ok(details) => details.inviter,
1554 Err(Error::WrongRoomState(_)) => None,
1555 Err(e) => {
1556 warn!("Error fetching invite details for room: {e:?}");
1557 None
1558 }
1559 };
1560
1561 Some(PreJoinRoomInfo { inviter })
1562 }
1563
1564 /// Finish joining a room.
1565 ///
1566 /// If the room was an invite that should be marked as a DM, will include it
1567 /// in the DM event after creating the joined room.
1568 ///
1569 /// If encrypted history sharing is enabled, will check to see if we have a
1570 /// key bundle, and import it if so.
1571 ///
1572 /// # Arguments
1573 ///
1574 /// * `room_id` - The `RoomId` of the room that was joined.
1575 /// * `pre_join_room_info` - Information about the room before we joined.
1576 async fn finish_join_room(
1577 &self,
1578 room_id: &RoomId,
1579 pre_join_room_info: Option<PreJoinRoomInfo>,
1580 ) -> Result<Room> {
1581 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1582 room.state() == RoomState::Invited
1583 && room.is_direct().await.unwrap_or_else(|e| {
1584 warn!(%room_id, "is_direct() failed: {e}");
1585 false
1586 })
1587 } else {
1588 false
1589 };
1590
1591 let base_room = self
1592 .base_client()
1593 .room_joined(
1594 room_id,
1595 pre_join_room_info
1596 .as_ref()
1597 .and_then(|info| info.inviter.as_ref())
1598 .map(|i| i.user_id().to_owned()),
1599 )
1600 .await?;
1601 let room = Room::new(self.clone(), base_room);
1602
1603 if mark_as_dm {
1604 room.set_is_direct(true).await?;
1605 }
1606
1607 #[cfg(feature = "e2e-encryption")]
1608 if self.inner.enable_share_history_on_invite
1609 && let Some(inviter) =
1610 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1611 {
1612 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1613 .await?;
1614 }
1615
1616 // Suppress "unused variable" and "unused field" lints
1617 #[cfg(not(feature = "e2e-encryption"))]
1618 let _ = pre_join_room_info.map(|i| i.inviter);
1619
1620 Ok(room)
1621 }
1622
1623 /// Join a room by `RoomId`.
1624 ///
1625 /// Returns the `Room` in the joined state.
1626 ///
1627 /// # Arguments
1628 ///
1629 /// * `room_id` - The `RoomId` of the room to be joined.
1630 #[instrument(skip(self))]
1631 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1632 // See who invited us to this room, if anyone. Note we have to do this before
1633 // making the `/join` request, otherwise we could race against the sync.
1634 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1635
1636 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1637 let response = self.send(request).await?;
1638 self.finish_join_room(&response.room_id, pre_join_info).await
1639 }
1640
1641 /// Join a room by `RoomOrAliasId`.
1642 ///
1643 /// Returns the `Room` in the joined state.
1644 ///
1645 /// # Arguments
1646 ///
1647 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1648 /// alias looks like `#name:example.com`.
1649 /// * `server_names` - The server names to be used for resolving the alias,
1650 /// if needs be.
1651 pub async fn join_room_by_id_or_alias(
1652 &self,
1653 alias: &RoomOrAliasId,
1654 server_names: &[OwnedServerName],
1655 ) -> Result<Room> {
1656 let pre_join_info = {
1657 match alias.try_into() {
1658 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1659 Err(_) => {
1660 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1661 // responding to an invitation to the room, and therefore don't need to handle
1662 // things that happen as a result of invites.
1663 None
1664 }
1665 }
1666 };
1667 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1668 via: server_names.to_owned(),
1669 });
1670 let response = self.send(request).await?;
1671 self.finish_join_room(&response.room_id, pre_join_info).await
1672 }
1673
1674 /// Search the homeserver's directory of public rooms.
1675 ///
1676 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1677 /// a `get_public_rooms::Response`.
1678 ///
1679 /// # Arguments
1680 ///
1681 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1682 ///
1683 /// * `since` - Pagination token from a previous request.
1684 ///
1685 /// * `server` - The name of the server, if `None` the requested server is
1686 /// used.
1687 ///
1688 /// # Examples
1689 /// ```no_run
1690 /// use matrix_sdk::Client;
1691 /// # use url::Url;
1692 /// # let homeserver = Url::parse("http://example.com").unwrap();
1693 /// # let limit = Some(10);
1694 /// # let since = Some("since token");
1695 /// # let server = Some("servername.com".try_into().unwrap());
1696 /// # async {
1697 /// let mut client = Client::new(homeserver).await.unwrap();
1698 ///
1699 /// client.public_rooms(limit, since, server).await;
1700 /// # };
1701 /// ```
1702 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1703 pub async fn public_rooms(
1704 &self,
1705 limit: Option<u32>,
1706 since: Option<&str>,
1707 server: Option<&ServerName>,
1708 ) -> HttpResult<get_public_rooms::v3::Response> {
1709 let limit = limit.map(UInt::from);
1710
1711 let request = assign!(get_public_rooms::v3::Request::new(), {
1712 limit,
1713 since: since.map(ToOwned::to_owned),
1714 server: server.map(ToOwned::to_owned),
1715 });
1716 self.send(request).await
1717 }
1718
1719 /// Create a room with the given parameters.
1720 ///
1721 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1722 /// created room.
1723 ///
1724 /// If you want to create a direct message with one specific user, you can
1725 /// use [`create_dm`][Self::create_dm], which is more convenient than
1726 /// assembling the [`create_room::v3::Request`] yourself.
1727 ///
1728 /// If the `is_direct` field of the request is set to `true` and at least
1729 /// one user is invited, the room will be automatically added to the direct
1730 /// rooms in the account data.
1731 ///
1732 /// # Examples
1733 ///
1734 /// ```no_run
1735 /// use matrix_sdk::{
1736 /// Client,
1737 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1738 /// };
1739 /// # use url::Url;
1740 /// #
1741 /// # async {
1742 /// # let homeserver = Url::parse("http://example.com").unwrap();
1743 /// let request = CreateRoomRequest::new();
1744 /// let client = Client::new(homeserver).await.unwrap();
1745 /// assert!(client.create_room(request).await.is_ok());
1746 /// # };
1747 /// ```
1748 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1749 let invite = request.invite.clone();
1750 let is_direct_room = request.is_direct;
1751 let response = self.send(request).await?;
1752
1753 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1754
1755 let joined_room = Room::new(self.clone(), base_room);
1756
1757 if is_direct_room
1758 && !invite.is_empty()
1759 && let Err(error) =
1760 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1761 {
1762 // FIXME: Retry in the background
1763 error!("Failed to mark room as DM: {error}");
1764 }
1765
1766 Ok(joined_room)
1767 }
1768
1769 /// Create a DM room.
1770 ///
1771 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1772 /// given user being invited, the room marked `is_direct` and both the
1773 /// creator and invitee getting the default maximum power level.
1774 ///
1775 /// If the `e2e-encryption` feature is enabled, the room will also be
1776 /// encrypted.
1777 ///
1778 /// # Arguments
1779 ///
1780 /// * `user_id` - The ID of the user to create a DM for.
1781 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1782 #[cfg(feature = "e2e-encryption")]
1783 let initial_state = vec![
1784 InitialStateEvent::with_empty_state_key(
1785 RoomEncryptionEventContent::with_recommended_defaults(),
1786 )
1787 .to_raw_any(),
1788 ];
1789
1790 #[cfg(not(feature = "e2e-encryption"))]
1791 let initial_state = vec![];
1792
1793 let request = assign!(create_room::v3::Request::new(), {
1794 invite: vec![user_id.to_owned()],
1795 is_direct: true,
1796 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1797 initial_state,
1798 });
1799
1800 self.create_room(request).await
1801 }
1802
1803 /// Search the homeserver's directory for public rooms with a filter.
1804 ///
1805 /// # Arguments
1806 ///
1807 /// * `room_search` - The easiest way to create this request is using the
1808 /// `get_public_rooms_filtered::Request` itself.
1809 ///
1810 /// # Examples
1811 ///
1812 /// ```no_run
1813 /// # use url::Url;
1814 /// # use matrix_sdk::Client;
1815 /// # async {
1816 /// # let homeserver = Url::parse("http://example.com")?;
1817 /// use matrix_sdk::ruma::{
1818 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1819 /// };
1820 /// # let mut client = Client::new(homeserver).await?;
1821 ///
1822 /// let mut filter = Filter::new();
1823 /// filter.generic_search_term = Some("rust".to_owned());
1824 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1825 /// request.filter = filter;
1826 ///
1827 /// let response = client.public_rooms_filtered(request).await?;
1828 ///
1829 /// for room in response.chunk {
1830 /// println!("Found room {room:?}");
1831 /// }
1832 /// # anyhow::Ok(()) };
1833 /// ```
1834 pub async fn public_rooms_filtered(
1835 &self,
1836 request: get_public_rooms_filtered::v3::Request,
1837 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1838 self.send(request).await
1839 }
1840
1841 /// Send an arbitrary request to the server, without updating client state.
1842 ///
1843 /// **Warning:** Because this method *does not* update the client state, it
1844 /// is important to make sure that you account for this yourself, and
1845 /// use wrapper methods where available. This method should *only* be
1846 /// used if a wrapper method for the endpoint you'd like to use is not
1847 /// available.
1848 ///
1849 /// # Arguments
1850 ///
1851 /// * `request` - A filled out and valid request for the endpoint to be hit
1852 ///
1853 /// * `timeout` - An optional request timeout setting, this overrides the
1854 /// default request setting if one was set.
1855 ///
1856 /// # Examples
1857 ///
1858 /// ```no_run
1859 /// # use matrix_sdk::{Client, config::SyncSettings};
1860 /// # use url::Url;
1861 /// # async {
1862 /// # let homeserver = Url::parse("http://localhost:8080")?;
1863 /// # let mut client = Client::new(homeserver).await?;
1864 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1865 ///
1866 /// // First construct the request you want to make
1867 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1868 /// // for all available Endpoints
1869 /// let user_id = user_id!("@example:localhost").to_owned();
1870 /// let request = profile::get_profile::v3::Request::new(user_id);
1871 ///
1872 /// // Start the request using Client::send()
1873 /// let response = client.send(request).await?;
1874 ///
1875 /// // Check the corresponding Response struct to find out what types are
1876 /// // returned
1877 /// # anyhow::Ok(()) };
1878 /// ```
1879 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1880 where
1881 Request: OutgoingRequest + Clone + Debug,
1882 Request::Authentication: SupportedAuthScheme,
1883 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1884 {
1885 SendRequest {
1886 client: self.clone(),
1887 request,
1888 config: None,
1889 send_progress: Default::default(),
1890 }
1891 }
1892
1893 pub(crate) async fn send_inner<Request>(
1894 &self,
1895 request: Request,
1896 config: Option<RequestConfig>,
1897 send_progress: SharedObservable<TransmissionProgress>,
1898 ) -> HttpResult<Request::IncomingResponse>
1899 where
1900 Request: OutgoingRequest + Debug,
1901 Request::Authentication: SupportedAuthScheme,
1902 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1903 {
1904 let homeserver = self.homeserver().to_string();
1905 let access_token = self.access_token();
1906
1907 self.inner
1908 .http_client
1909 .send(
1910 request,
1911 config,
1912 homeserver,
1913 access_token.as_deref(),
1914 &self.supported_versions().await?,
1915 send_progress,
1916 )
1917 .await
1918 }
1919
1920 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1921 _ = self
1922 .inner
1923 .auth_ctx
1924 .session_change_sender
1925 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1926 }
1927
1928 /// Fetches server versions from network; no caching.
1929 pub async fn fetch_server_versions(
1930 &self,
1931 request_config: Option<RequestConfig>,
1932 ) -> HttpResult<get_supported_versions::Response> {
1933 let server_versions = self
1934 .inner
1935 .http_client
1936 .send(
1937 get_supported_versions::Request::new(),
1938 request_config,
1939 self.homeserver().to_string(),
1940 None,
1941 &SupportedVersions {
1942 versions: [MatrixVersion::V1_0].into(),
1943 features: Default::default(),
1944 },
1945 Default::default(),
1946 )
1947 .await?;
1948
1949 Ok(server_versions)
1950 }
1951
1952 /// Fetches client well_known from network; no caching.
1953 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1954 let server_url_string = self
1955 .server()
1956 .unwrap_or(
1957 // Sometimes people configure their well-known directly on the homeserver so use
1958 // this as a fallback when the server name is unknown.
1959 &self.homeserver(),
1960 )
1961 .to_string();
1962
1963 let well_known = self
1964 .inner
1965 .http_client
1966 .send(
1967 discover_homeserver::Request::new(),
1968 Some(RequestConfig::short_retry()),
1969 server_url_string,
1970 None,
1971 &SupportedVersions {
1972 versions: [MatrixVersion::V1_0].into(),
1973 features: Default::default(),
1974 },
1975 Default::default(),
1976 )
1977 .await;
1978
1979 match well_known {
1980 Ok(well_known) => Some(well_known),
1981 Err(http_error) => {
1982 // It is perfectly valid to not have a well-known file.
1983 // Maybe we should check for a specific error code to be sure?
1984 warn!("Failed to fetch client well-known: {http_error}");
1985 None
1986 }
1987 }
1988 }
1989
1990 /// Load server info from storage, or fetch them from network and cache
1991 /// them.
1992 async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1993 match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1994 Ok(Some(stored)) => {
1995 if let Some(server_info) =
1996 stored.into_server_info().and_then(|info| info.maybe_decode())
1997 {
1998 return Ok(server_info);
1999 }
2000 }
2001 Ok(None) => {
2002 // fallthrough: cache is empty
2003 }
2004 Err(err) => {
2005 warn!("error when loading cached server info: {err}");
2006 // fallthrough to network.
2007 }
2008 }
2009
2010 let server_versions = self.fetch_server_versions(None).await?;
2011 let well_known = self.fetch_client_well_known().await;
2012 let server_info = ServerInfo::new(
2013 server_versions.versions.clone(),
2014 server_versions.unstable_features.clone(),
2015 well_known.map(Into::into),
2016 );
2017
2018 // Attempt to cache the result in storage.
2019 {
2020 if let Err(err) = self
2021 .state_store()
2022 .set_kv_data(
2023 StateStoreDataKey::ServerInfo,
2024 StateStoreDataValue::ServerInfo(server_info.clone()),
2025 )
2026 .await
2027 {
2028 warn!("error when caching server info: {err}");
2029 }
2030 }
2031
2032 Ok(server_info)
2033 }
2034
2035 async fn get_or_load_and_cache_server_info<
2036 Value,
2037 MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
2038 >(
2039 &self,
2040 map: MapFunction,
2041 ) -> HttpResult<Value> {
2042 let server_info = &self.inner.caches.server_info;
2043 if let CachedValue::Cached(val) = map(&*server_info.read().await) {
2044 return Ok(val);
2045 }
2046
2047 let mut guarded_server_info = server_info.write().await;
2048 if let CachedValue::Cached(val) = map(&guarded_server_info) {
2049 return Ok(val);
2050 }
2051
2052 let server_info = self.load_or_fetch_server_info().await?;
2053
2054 // Fill both unstable features and server versions at once.
2055 let mut supported = server_info.supported_versions();
2056 if supported.versions.is_empty() {
2057 supported.versions = [MatrixVersion::V1_0].into();
2058 }
2059
2060 guarded_server_info.supported_versions = CachedValue::Cached(supported);
2061 guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2062
2063 // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2064 // fetch an optional property), the function will always return some.
2065 Ok(map(&guarded_server_info).unwrap_cached_value())
2066 }
2067
2068 /// Get the Matrix versions and features supported by the homeserver by
2069 /// fetching them from the server or the cache.
2070 ///
2071 /// This is equivalent to calling both [`Client::server_versions()`] and
2072 /// [`Client::unstable_features()`]. To always fetch the result from the
2073 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2074 /// and then `.as_supported_versions()` on the response.
2075 ///
2076 /// # Examples
2077 ///
2078 /// ```no_run
2079 /// use ruma::api::{FeatureFlag, MatrixVersion};
2080 /// # use matrix_sdk::{Client, config::SyncSettings};
2081 /// # use url::Url;
2082 /// # async {
2083 /// # let homeserver = Url::parse("http://localhost:8080")?;
2084 /// # let mut client = Client::new(homeserver).await?;
2085 ///
2086 /// let supported = client.supported_versions().await?;
2087 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2088 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2089 ///
2090 /// let msc_x_feature = FeatureFlag::from("msc_x");
2091 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2092 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2093 /// # anyhow::Ok(()) };
2094 /// ```
2095 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2096 self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2097 .await
2098 }
2099
2100 /// Get the Matrix versions supported by the homeserver by fetching them
2101 /// from the server or the cache.
2102 ///
2103 /// # Examples
2104 ///
2105 /// ```no_run
2106 /// use ruma::api::MatrixVersion;
2107 /// # use matrix_sdk::{Client, config::SyncSettings};
2108 /// # use url::Url;
2109 /// # async {
2110 /// # let homeserver = Url::parse("http://localhost:8080")?;
2111 /// # let mut client = Client::new(homeserver).await?;
2112 ///
2113 /// let server_versions = client.server_versions().await?;
2114 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2115 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2116 /// # anyhow::Ok(()) };
2117 /// ```
2118 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2119 self.get_or_load_and_cache_server_info(|server_info| {
2120 server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2121 })
2122 .await
2123 }
2124
2125 /// Get the unstable features supported by the homeserver by fetching them
2126 /// from the server or the cache.
2127 ///
2128 /// # Examples
2129 ///
2130 /// ```no_run
2131 /// use matrix_sdk::ruma::api::FeatureFlag;
2132 /// # use matrix_sdk::{Client, config::SyncSettings};
2133 /// # use url::Url;
2134 /// # async {
2135 /// # let homeserver = Url::parse("http://localhost:8080")?;
2136 /// # let mut client = Client::new(homeserver).await?;
2137 ///
2138 /// let msc_x_feature = FeatureFlag::from("msc_x");
2139 /// let unstable_features = client.unstable_features().await?;
2140 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2141 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2142 /// # anyhow::Ok(()) };
2143 /// ```
2144 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2145 self.get_or_load_and_cache_server_info(|server_info| {
2146 server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2147 })
2148 .await
2149 }
2150
2151 /// Get information about the homeserver's advertised RTC foci by fetching
2152 /// the well-known file from the server or the cache.
2153 ///
2154 /// # Examples
2155 /// ```no_run
2156 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2157 /// # use url::Url;
2158 /// # async {
2159 /// # let homeserver = Url::parse("http://localhost:8080")?;
2160 /// # let mut client = Client::new(homeserver).await?;
2161 /// let rtc_foci = client.rtc_foci().await?;
2162 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2163 /// RtcFocusInfo::LiveKit(info) => Some(info),
2164 /// _ => None,
2165 /// });
2166 /// if let Some(info) = default_livekit_focus_info {
2167 /// println!("Default LiveKit service URL: {}", info.service_url);
2168 /// }
2169 /// # anyhow::Ok(()) };
2170 /// ```
2171 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2172 let well_known = self
2173 .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2174 .await?;
2175
2176 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2177 }
2178
2179 /// Empty the server version and unstable features cache.
2180 ///
2181 /// Since the SDK caches server info (versions, unstable features,
2182 /// well-known etc), it's possible to have a stale entry in the cache. This
2183 /// functions makes it possible to force reset it.
2184 pub async fn reset_server_info(&self) -> Result<()> {
2185 // Empty the in-memory caches.
2186 let mut guard = self.inner.caches.server_info.write().await;
2187 guard.supported_versions = CachedValue::NotSet;
2188
2189 // Empty the store cache.
2190 Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2191 }
2192
2193 /// Check whether MSC 4028 is enabled on the homeserver.
2194 ///
2195 /// # Examples
2196 ///
2197 /// ```no_run
2198 /// # use matrix_sdk::{Client, config::SyncSettings};
2199 /// # use url::Url;
2200 /// # async {
2201 /// # let homeserver = Url::parse("http://localhost:8080")?;
2202 /// # let mut client = Client::new(homeserver).await?;
2203 /// let msc4028_enabled =
2204 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2205 /// # anyhow::Ok(()) };
2206 /// ```
2207 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2208 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2209 }
2210
2211 /// Get information of all our own devices.
2212 ///
2213 /// # Examples
2214 ///
2215 /// ```no_run
2216 /// # use matrix_sdk::{Client, config::SyncSettings};
2217 /// # use url::Url;
2218 /// # async {
2219 /// # let homeserver = Url::parse("http://localhost:8080")?;
2220 /// # let mut client = Client::new(homeserver).await?;
2221 /// let response = client.devices().await?;
2222 ///
2223 /// for device in response.devices {
2224 /// println!(
2225 /// "Device: {} {}",
2226 /// device.device_id,
2227 /// device.display_name.as_deref().unwrap_or("")
2228 /// );
2229 /// }
2230 /// # anyhow::Ok(()) };
2231 /// ```
2232 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2233 let request = get_devices::v3::Request::new();
2234
2235 self.send(request).await
2236 }
2237
2238 /// Delete the given devices from the server.
2239 ///
2240 /// # Arguments
2241 ///
2242 /// * `devices` - The list of devices that should be deleted from the
2243 /// server.
2244 ///
2245 /// * `auth_data` - This request requires user interactive auth, the first
2246 /// request needs to set this to `None` and will always fail with an
2247 /// `UiaaResponse`. The response will contain information for the
2248 /// interactive auth and the same request needs to be made but this time
2249 /// with some `auth_data` provided.
2250 ///
2251 /// ```no_run
2252 /// # use matrix_sdk::{
2253 /// # ruma::{api::client::uiaa, device_id},
2254 /// # Client, Error, config::SyncSettings,
2255 /// # };
2256 /// # use serde_json::json;
2257 /// # use url::Url;
2258 /// # use std::collections::BTreeMap;
2259 /// # async {
2260 /// # let homeserver = Url::parse("http://localhost:8080")?;
2261 /// # let mut client = Client::new(homeserver).await?;
2262 /// let devices = &[device_id!("DEVICEID").to_owned()];
2263 ///
2264 /// if let Err(e) = client.delete_devices(devices, None).await {
2265 /// if let Some(info) = e.as_uiaa_response() {
2266 /// let mut password = uiaa::Password::new(
2267 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2268 /// "wordpass".to_owned(),
2269 /// );
2270 /// password.session = info.session.clone();
2271 ///
2272 /// client
2273 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2274 /// .await?;
2275 /// }
2276 /// }
2277 /// # anyhow::Ok(()) };
2278 pub async fn delete_devices(
2279 &self,
2280 devices: &[OwnedDeviceId],
2281 auth_data: Option<uiaa::AuthData>,
2282 ) -> HttpResult<delete_devices::v3::Response> {
2283 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2284 request.auth = auth_data;
2285
2286 self.send(request).await
2287 }
2288
2289 /// Change the display name of a device owned by the current user.
2290 ///
2291 /// Returns a `update_device::Response` which specifies the result
2292 /// of the operation.
2293 ///
2294 /// # Arguments
2295 ///
2296 /// * `device_id` - The ID of the device to change the display name of.
2297 /// * `display_name` - The new display name to set.
2298 pub async fn rename_device(
2299 &self,
2300 device_id: &DeviceId,
2301 display_name: &str,
2302 ) -> HttpResult<update_device::v3::Response> {
2303 let mut request = update_device::v3::Request::new(device_id.to_owned());
2304 request.display_name = Some(display_name.to_owned());
2305
2306 self.send(request).await
2307 }
2308
2309 /// Synchronize the client's state with the latest state on the server.
2310 ///
2311 /// ## Syncing Events
2312 ///
2313 /// Messages or any other type of event need to be periodically fetched from
2314 /// the server, this is achieved by sending a `/sync` request to the server.
2315 ///
2316 /// The first sync is sent out without a [`token`]. The response of the
2317 /// first sync will contain a [`next_batch`] field which should then be
2318 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2319 /// don't receive the same events multiple times.
2320 ///
2321 /// ## Long Polling
2322 ///
2323 /// A sync should in the usual case always be in flight. The
2324 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2325 /// long the server will wait for new events before it will respond.
2326 /// The server will respond immediately if some new events arrive before the
2327 /// timeout has expired. If no changes arrive and the timeout expires an
2328 /// empty sync response will be sent to the client.
2329 ///
2330 /// This method of sending a request that may not receive a response
2331 /// immediately is called long polling.
2332 ///
2333 /// ## Filtering Events
2334 ///
2335 /// The number or type of messages and events that the client should receive
2336 /// from the server can be altered using a [`Filter`].
2337 ///
2338 /// Filters can be non-trivial and, since they will be sent with every sync
2339 /// request, they may take up a bunch of unnecessary bandwidth.
2340 ///
2341 /// Luckily filters can be uploaded to the server and reused using an unique
2342 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2343 /// method.
2344 ///
2345 /// # Arguments
2346 ///
2347 /// * `sync_settings` - Settings for the sync call, this allows us to set
2348 /// various options to configure the sync:
2349 /// * [`filter`] - To configure which events we receive and which get
2350 /// [filtered] by the server
2351 /// * [`timeout`] - To configure our [long polling] setup.
2352 /// * [`token`] - To tell the server which events we already received
2353 /// and where we wish to continue syncing.
2354 /// * [`full_state`] - To tell the server that we wish to receive all
2355 /// state events, regardless of our configured [`token`].
2356 /// * [`set_presence`] - To tell the server to set the presence and to
2357 /// which state.
2358 ///
2359 /// # Examples
2360 ///
2361 /// ```no_run
2362 /// # use url::Url;
2363 /// # async {
2364 /// # let homeserver = Url::parse("http://localhost:8080")?;
2365 /// # let username = "";
2366 /// # let password = "";
2367 /// use matrix_sdk::{
2368 /// Client, config::SyncSettings,
2369 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2370 /// };
2371 ///
2372 /// let client = Client::new(homeserver).await?;
2373 /// client.matrix_auth().login_username(username, password).send().await?;
2374 ///
2375 /// // Sync once so we receive the client state and old messages.
2376 /// client.sync_once(SyncSettings::default()).await?;
2377 ///
2378 /// // Register our handler so we start responding once we receive a new
2379 /// // event.
2380 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2381 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2382 /// });
2383 ///
2384 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2385 /// // from our `sync_once()` call automatically.
2386 /// client.sync(SyncSettings::default()).await;
2387 /// # anyhow::Ok(()) };
2388 /// ```
2389 ///
2390 /// [`sync`]: #method.sync
2391 /// [`SyncSettings`]: crate::config::SyncSettings
2392 /// [`token`]: crate::config::SyncSettings#method.token
2393 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2394 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2395 /// [`set_presence`]: ruma::presence::PresenceState
2396 /// [`filter`]: crate::config::SyncSettings#method.filter
2397 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2398 /// [`next_batch`]: SyncResponse#structfield.next_batch
2399 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2400 /// [long polling]: #long-polling
2401 /// [filtered]: #filtering-events
2402 #[instrument(skip(self))]
2403 pub async fn sync_once(
2404 &self,
2405 sync_settings: crate::config::SyncSettings,
2406 ) -> Result<SyncResponse> {
2407 // The sync might not return for quite a while due to the timeout.
2408 // We'll see if there's anything crypto related to send out before we
2409 // sync, i.e. if we closed our client after a sync but before the
2410 // crypto requests were sent out.
2411 //
2412 // This will mostly be a no-op.
2413 #[cfg(feature = "e2e-encryption")]
2414 if let Err(e) = self.send_outgoing_requests().await {
2415 error!(error = ?e, "Error while sending outgoing E2EE requests");
2416 }
2417
2418 let token = match sync_settings.token {
2419 SyncToken::Specific(token) => Some(token),
2420 SyncToken::NoToken => None,
2421 SyncToken::ReusePrevious => self.sync_token().await,
2422 };
2423
2424 let request = assign!(sync_events::v3::Request::new(), {
2425 filter: sync_settings.filter.map(|f| *f),
2426 since: token,
2427 full_state: sync_settings.full_state,
2428 set_presence: sync_settings.set_presence,
2429 timeout: sync_settings.timeout,
2430 use_state_after: true,
2431 });
2432 let mut request_config = self.request_config();
2433 if let Some(timeout) = sync_settings.timeout {
2434 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2435 request_config.timeout = Some(base_timeout + timeout);
2436 }
2437
2438 let response = self.send(request).with_request_config(request_config).await?;
2439 let next_batch = response.next_batch.clone();
2440 let response = self.process_sync(response).await?;
2441
2442 #[cfg(feature = "e2e-encryption")]
2443 if let Err(e) = self.send_outgoing_requests().await {
2444 error!(error = ?e, "Error while sending outgoing E2EE requests");
2445 }
2446
2447 self.inner.sync_beat.notify(usize::MAX);
2448
2449 Ok(SyncResponse::new(next_batch, response))
2450 }
2451
2452 /// Repeatedly synchronize the client state with the server.
2453 ///
2454 /// This method will only return on error, if cancellation is needed
2455 /// the method should be wrapped in a cancelable task or the
2456 /// [`Client::sync_with_callback`] method can be used or
2457 /// [`Client::sync_with_result_callback`] if you want to handle error
2458 /// cases in the loop, too.
2459 ///
2460 /// This method will internally call [`Client::sync_once`] in a loop.
2461 ///
2462 /// This method can be used with the [`Client::add_event_handler`]
2463 /// method to react to individual events. If you instead wish to handle
2464 /// events in a bulk manner the [`Client::sync_with_callback`],
2465 /// [`Client::sync_with_result_callback`] and
2466 /// [`Client::sync_stream`] methods can be used instead. Those methods
2467 /// repeatedly return the whole sync response.
2468 ///
2469 /// # Arguments
2470 ///
2471 /// * `sync_settings` - Settings for the sync call. *Note* that those
2472 /// settings will be only used for the first sync call. See the argument
2473 /// docs for [`Client::sync_once`] for more info.
2474 ///
2475 /// # Return
2476 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2477 /// up to the user of the API to check the error and decide whether the sync
2478 /// should continue or not.
2479 ///
2480 /// # Examples
2481 ///
2482 /// ```no_run
2483 /// # use url::Url;
2484 /// # async {
2485 /// # let homeserver = Url::parse("http://localhost:8080")?;
2486 /// # let username = "";
2487 /// # let password = "";
2488 /// use matrix_sdk::{
2489 /// Client, config::SyncSettings,
2490 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2491 /// };
2492 ///
2493 /// let client = Client::new(homeserver).await?;
2494 /// client.matrix_auth().login_username(&username, &password).send().await?;
2495 ///
2496 /// // Register our handler so we start responding once we receive a new
2497 /// // event.
2498 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2499 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2500 /// });
2501 ///
2502 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2503 /// // automatically.
2504 /// client.sync(SyncSettings::default()).await?;
2505 /// # anyhow::Ok(()) };
2506 /// ```
2507 ///
2508 /// [argument docs]: #method.sync_once
2509 /// [`sync_with_callback`]: #method.sync_with_callback
2510 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2511 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2512 }
2513
2514 /// Repeatedly call sync to synchronize the client state with the server.
2515 ///
2516 /// # Arguments
2517 ///
2518 /// * `sync_settings` - Settings for the sync call. *Note* that those
2519 /// settings will be only used for the first sync call. See the argument
2520 /// docs for [`Client::sync_once`] for more info.
2521 ///
2522 /// * `callback` - A callback that will be called every time a successful
2523 /// response has been fetched from the server. The callback must return a
2524 /// boolean which signalizes if the method should stop syncing. If the
2525 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2526 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2527 ///
2528 /// # Return
2529 /// The sync runs until an error occurs or the
2530 /// callback indicates that the Loop should stop. If the callback asked for
2531 /// a regular stop, the result will be `Ok(())` otherwise the
2532 /// `Err(Error)` is returned.
2533 ///
2534 /// # Examples
2535 ///
2536 /// The following example demonstrates how to sync forever while sending all
2537 /// the interesting events through a mpsc channel to another thread e.g. a
2538 /// UI thread.
2539 ///
2540 /// ```no_run
2541 /// # use std::time::Duration;
2542 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2543 /// # use url::Url;
2544 /// # async {
2545 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2546 /// # let mut client = Client::new(homeserver).await.unwrap();
2547 ///
2548 /// use tokio::sync::mpsc::channel;
2549 ///
2550 /// let (tx, rx) = channel(100);
2551 ///
2552 /// let sync_channel = &tx;
2553 /// let sync_settings = SyncSettings::new()
2554 /// .timeout(Duration::from_secs(30));
2555 ///
2556 /// client
2557 /// .sync_with_callback(sync_settings, |response| async move {
2558 /// let channel = sync_channel;
2559 /// for (room_id, room) in response.rooms.joined {
2560 /// for event in room.timeline.events {
2561 /// channel.send(event).await.unwrap();
2562 /// }
2563 /// }
2564 ///
2565 /// LoopCtrl::Continue
2566 /// })
2567 /// .await;
2568 /// };
2569 /// ```
2570 #[instrument(skip_all)]
2571 pub async fn sync_with_callback<C>(
2572 &self,
2573 sync_settings: crate::config::SyncSettings,
2574 callback: impl Fn(SyncResponse) -> C,
2575 ) -> Result<(), Error>
2576 where
2577 C: Future<Output = LoopCtrl>,
2578 {
2579 self.sync_with_result_callback(sync_settings, |result| async {
2580 Ok(callback(result?).await)
2581 })
2582 .await
2583 }
2584
2585 /// Repeatedly call sync to synchronize the client state with the server.
2586 ///
2587 /// # Arguments
2588 ///
2589 /// * `sync_settings` - Settings for the sync call. *Note* that those
2590 /// settings will be only used for the first sync call. See the argument
2591 /// docs for [`Client::sync_once`] for more info.
2592 ///
2593 /// * `callback` - A callback that will be called every time after a
2594 /// response has been received, failure or not. The callback returns a
2595 /// `Result<LoopCtrl, Error>`, too. When returning
2596 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2597 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2598 /// function returns `Ok(())`. In case the callback can't handle the
2599 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2600 /// which results in the sync ending and the `Err(Error)` being returned.
2601 ///
2602 /// # Return
2603 /// The sync runs until an error occurs that the callback can't handle or
2604 /// the callback indicates that the Loop should stop. If the callback
2605 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2606 /// `Err(Error)` is returned.
2607 ///
2608 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2609 /// this, and are handled first without sending the result to the
2610 /// callback. Only after they have exceeded is the `Result` handed to
2611 /// the callback.
2612 ///
2613 /// # Examples
2614 ///
2615 /// The following example demonstrates how to sync forever while sending all
2616 /// the interesting events through a mpsc channel to another thread e.g. a
2617 /// UI thread.
2618 ///
2619 /// ```no_run
2620 /// # use std::time::Duration;
2621 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2622 /// # use url::Url;
2623 /// # async {
2624 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2625 /// # let mut client = Client::new(homeserver).await.unwrap();
2626 /// #
2627 /// use tokio::sync::mpsc::channel;
2628 ///
2629 /// let (tx, rx) = channel(100);
2630 ///
2631 /// let sync_channel = &tx;
2632 /// let sync_settings = SyncSettings::new()
2633 /// .timeout(Duration::from_secs(30));
2634 ///
2635 /// client
2636 /// .sync_with_result_callback(sync_settings, |response| async move {
2637 /// let channel = sync_channel;
2638 /// let sync_response = response?;
2639 /// for (room_id, room) in sync_response.rooms.joined {
2640 /// for event in room.timeline.events {
2641 /// channel.send(event).await.unwrap();
2642 /// }
2643 /// }
2644 ///
2645 /// Ok(LoopCtrl::Continue)
2646 /// })
2647 /// .await;
2648 /// };
2649 /// ```
2650 #[instrument(skip(self, callback))]
2651 pub async fn sync_with_result_callback<C>(
2652 &self,
2653 sync_settings: crate::config::SyncSettings,
2654 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2655 ) -> Result<(), Error>
2656 where
2657 C: Future<Output = Result<LoopCtrl, Error>>,
2658 {
2659 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2660
2661 while let Some(result) = sync_stream.next().await {
2662 trace!("Running callback");
2663 if callback(result).await? == LoopCtrl::Break {
2664 trace!("Callback told us to stop");
2665 break;
2666 }
2667 trace!("Done running callback");
2668 }
2669
2670 Ok(())
2671 }
2672
2673 //// Repeatedly synchronize the client state with the server.
2674 ///
2675 /// This method will internally call [`Client::sync_once`] in a loop and is
2676 /// equivalent to the [`Client::sync`] method but the responses are provided
2677 /// as an async stream.
2678 ///
2679 /// # Arguments
2680 ///
2681 /// * `sync_settings` - Settings for the sync call. *Note* that those
2682 /// settings will be only used for the first sync call. See the argument
2683 /// docs for [`Client::sync_once`] for more info.
2684 ///
2685 /// # Examples
2686 ///
2687 /// ```no_run
2688 /// # use url::Url;
2689 /// # async {
2690 /// # let homeserver = Url::parse("http://localhost:8080")?;
2691 /// # let username = "";
2692 /// # let password = "";
2693 /// use futures_util::StreamExt;
2694 /// use matrix_sdk::{Client, config::SyncSettings};
2695 ///
2696 /// let client = Client::new(homeserver).await?;
2697 /// client.matrix_auth().login_username(&username, &password).send().await?;
2698 ///
2699 /// let mut sync_stream =
2700 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2701 ///
2702 /// while let Some(Ok(response)) = sync_stream.next().await {
2703 /// for room in response.rooms.joined.values() {
2704 /// for e in &room.timeline.events {
2705 /// if let Ok(event) = e.raw().deserialize() {
2706 /// println!("Received event {:?}", event);
2707 /// }
2708 /// }
2709 /// }
2710 /// }
2711 ///
2712 /// # anyhow::Ok(()) };
2713 /// ```
2714 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2715 #[instrument(skip(self))]
2716 pub async fn sync_stream(
2717 &self,
2718 mut sync_settings: crate::config::SyncSettings,
2719 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2720 let mut is_first_sync = true;
2721 let mut timeout = None;
2722 let mut last_sync_time: Option<Instant> = None;
2723
2724 let parent_span = Span::current();
2725
2726 async_stream::stream!({
2727 loop {
2728 trace!("Syncing");
2729
2730 if sync_settings.ignore_timeout_on_first_sync {
2731 if is_first_sync {
2732 timeout = sync_settings.timeout.take();
2733 } else if sync_settings.timeout.is_none() && timeout.is_some() {
2734 sync_settings.timeout = timeout.take();
2735 }
2736
2737 is_first_sync = false;
2738 }
2739
2740 yield self
2741 .sync_loop_helper(&mut sync_settings)
2742 .instrument(parent_span.clone())
2743 .await;
2744
2745 Client::delay_sync(&mut last_sync_time).await
2746 }
2747 })
2748 }
2749
2750 /// Get the current, if any, sync token of the client.
2751 /// This will be None if the client didn't sync at least once.
2752 pub(crate) async fn sync_token(&self) -> Option<String> {
2753 self.inner.base_client.sync_token().await
2754 }
2755
2756 /// Gets information about the owner of a given access token.
2757 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2758 let request = whoami::v3::Request::new();
2759 self.send(request).await
2760 }
2761
2762 /// Subscribes a new receiver to client SessionChange broadcasts.
2763 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2764 let broadcast = &self.auth_ctx().session_change_sender;
2765 broadcast.subscribe()
2766 }
2767
2768 /// Sets the save/restore session callbacks.
2769 ///
2770 /// This is another mechanism to get synchronous updates to session tokens,
2771 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2772 pub fn set_session_callbacks(
2773 &self,
2774 reload_session_callback: Box<ReloadSessionCallback>,
2775 save_session_callback: Box<SaveSessionCallback>,
2776 ) -> Result<()> {
2777 self.inner
2778 .auth_ctx
2779 .reload_session_callback
2780 .set(reload_session_callback)
2781 .map_err(|_| Error::MultipleSessionCallbacks)?;
2782
2783 self.inner
2784 .auth_ctx
2785 .save_session_callback
2786 .set(save_session_callback)
2787 .map_err(|_| Error::MultipleSessionCallbacks)?;
2788
2789 Ok(())
2790 }
2791
2792 /// Get the notification settings of the current owner of the client.
2793 pub async fn notification_settings(&self) -> NotificationSettings {
2794 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2795 NotificationSettings::new(self.clone(), ruleset)
2796 }
2797
2798 /// Create a new specialized `Client` that can process notifications.
2799 ///
2800 /// See [`CrossProcessLock::new`] to learn more about
2801 /// `cross_process_store_locks_holder_name`.
2802 ///
2803 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
2804 pub async fn notification_client(
2805 &self,
2806 cross_process_store_locks_holder_name: String,
2807 ) -> Result<Client> {
2808 let client = Client {
2809 inner: ClientInner::new(
2810 self.inner.auth_ctx.clone(),
2811 self.server().cloned(),
2812 self.homeserver(),
2813 self.sliding_sync_version(),
2814 self.inner.http_client.clone(),
2815 self.inner
2816 .base_client
2817 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2818 .await?,
2819 self.inner.caches.server_info.read().await.clone(),
2820 self.inner.respect_login_well_known,
2821 self.inner.event_cache.clone(),
2822 self.inner.send_queue_data.clone(),
2823 self.inner.latest_events.clone(),
2824 #[cfg(feature = "e2e-encryption")]
2825 self.inner.e2ee.encryption_settings,
2826 #[cfg(feature = "e2e-encryption")]
2827 self.inner.enable_share_history_on_invite,
2828 cross_process_store_locks_holder_name,
2829 #[cfg(feature = "experimental-search")]
2830 self.inner.search_index.clone(),
2831 self.inner.thread_subscription_catchup.clone(),
2832 )
2833 .await,
2834 };
2835
2836 Ok(client)
2837 }
2838
2839 /// The [`EventCache`] instance for this [`Client`].
2840 pub fn event_cache(&self) -> &EventCache {
2841 // SAFETY: always initialized in the `Client` ctor.
2842 self.inner.event_cache.get().unwrap()
2843 }
2844
2845 /// The [`LatestEvents`] instance for this [`Client`].
2846 pub async fn latest_events(&self) -> &LatestEvents {
2847 self.inner
2848 .latest_events
2849 .get_or_init(|| async {
2850 LatestEvents::new(
2851 WeakClient::from_client(self),
2852 self.event_cache().clone(),
2853 SendQueue::new(self.clone()),
2854 )
2855 })
2856 .await
2857 }
2858
2859 /// Waits until an at least partially synced room is received, and returns
2860 /// it.
2861 ///
2862 /// **Note: this function will loop endlessly until either it finds the room
2863 /// or an externally set timeout happens.**
2864 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2865 loop {
2866 if let Some(room) = self.get_room(room_id) {
2867 if room.is_state_partially_or_fully_synced() {
2868 debug!("Found just created room!");
2869 return room;
2870 }
2871 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2872 } else {
2873 debug!("Room wasn't found, waiting for sync beat to try again");
2874 }
2875 self.inner.sync_beat.listen().await;
2876 }
2877 }
2878
2879 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2880 /// join it.
2881 pub async fn knock(
2882 &self,
2883 room_id_or_alias: OwnedRoomOrAliasId,
2884 reason: Option<String>,
2885 server_names: Vec<OwnedServerName>,
2886 ) -> Result<Room> {
2887 let request =
2888 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2889 let response = self.send(request).await?;
2890 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2891 Ok(Room::new(self.clone(), base_room))
2892 }
2893
2894 /// Checks whether the provided `user_id` belongs to an ignored user.
2895 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2896 self.base_client().is_user_ignored(user_id).await
2897 }
2898
2899 /// Gets the `max_upload_size` value from the homeserver, getting either a
2900 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2901 /// missing.
2902 ///
2903 /// Check the spec for more info:
2904 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2905 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2906 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2907 if let Some(data) = max_upload_size_lock.get() {
2908 return Ok(data.to_owned());
2909 }
2910
2911 // Use the authenticated endpoint when the server supports it.
2912 let supported_versions = self.supported_versions().await?;
2913 let use_auth =
2914 authenticated_media::get_media_config::v1::Request::is_supported(&supported_versions);
2915
2916 let upload_size = if use_auth {
2917 self.send(authenticated_media::get_media_config::v1::Request::default())
2918 .await?
2919 .upload_size
2920 } else {
2921 #[allow(deprecated)]
2922 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
2923 };
2924
2925 match max_upload_size_lock.set(upload_size) {
2926 Ok(_) => Ok(upload_size),
2927 Err(error) => {
2928 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2929 }
2930 }
2931 }
2932
2933 /// The settings to use for decrypting events.
2934 #[cfg(feature = "e2e-encryption")]
2935 pub fn decryption_settings(&self) -> &DecryptionSettings {
2936 &self.base_client().decryption_settings
2937 }
2938
2939 /// Returns the [`SearchIndex`] for this [`Client`].
2940 #[cfg(feature = "experimental-search")]
2941 pub fn search_index(&self) -> &SearchIndex {
2942 &self.inner.search_index
2943 }
2944
2945 /// Whether the client is configured to take thread subscriptions (MSC4306
2946 /// and MSC4308) into account.
2947 ///
2948 /// This may cause filtering out of thread subscriptions, and loading the
2949 /// thread subscriptions via the sliding sync extension, when the room
2950 /// list service is being used.
2951 pub fn enabled_thread_subscriptions(&self) -> bool {
2952 match self.base_client().threading_support {
2953 ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
2954 ThreadingSupport::Disabled => false,
2955 }
2956 }
2957
2958 /// Fetch thread subscriptions changes between `from` and up to `to`.
2959 ///
2960 /// The `limit` optional parameter can be used to limit the number of
2961 /// entries in a response. It can also be overridden by the server, if
2962 /// it's deemed too large.
2963 pub async fn fetch_thread_subscriptions(
2964 &self,
2965 from: Option<String>,
2966 to: Option<String>,
2967 limit: Option<UInt>,
2968 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
2969 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
2970 from,
2971 to,
2972 limit,
2973 });
2974 Ok(self.send(request).await?)
2975 }
2976
2977 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
2978 self.inner.thread_subscription_catchup.get().unwrap()
2979 }
2980}
2981
2982#[cfg(any(feature = "testing", test))]
2983impl Client {
2984 /// Test helper to mark users as tracked by the crypto layer.
2985 #[cfg(feature = "e2e-encryption")]
2986 pub async fn update_tracked_users_for_testing(
2987 &self,
2988 user_ids: impl IntoIterator<Item = &UserId>,
2989 ) {
2990 let olm = self.olm_machine().await;
2991 let olm = olm.as_ref().unwrap();
2992 olm.update_tracked_users(user_ids).await.unwrap();
2993 }
2994}
2995
2996/// A weak reference to the inner client, useful when trying to get a handle
2997/// on the owning client.
2998#[derive(Clone, Debug)]
2999pub(crate) struct WeakClient {
3000 client: Weak<ClientInner>,
3001}
3002
3003impl WeakClient {
3004 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3005 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3006 Self { client: Arc::downgrade(client) }
3007 }
3008
3009 /// Construct a [`WeakClient`] from a [`Client`].
3010 pub fn from_client(client: &Client) -> Self {
3011 Self::from_inner(&client.inner)
3012 }
3013
3014 /// Attempts to get a [`Client`] from this [`WeakClient`].
3015 pub fn get(&self) -> Option<Client> {
3016 self.client.upgrade().map(|inner| Client { inner })
3017 }
3018
3019 /// Gets the number of strong (`Arc`) pointers still pointing to this
3020 /// client.
3021 #[allow(dead_code)]
3022 pub fn strong_count(&self) -> usize {
3023 self.client.strong_count()
3024 }
3025}
3026
3027#[derive(Clone)]
3028struct ClientServerInfo {
3029 /// The Matrix versions and unstable features the server supports (known
3030 /// ones only).
3031 supported_versions: CachedValue<SupportedVersions>,
3032
3033 /// The server's well-known file, if any.
3034 well_known: CachedValue<Option<WellKnownResponse>>,
3035}
3036
3037/// A cached value that can either be set or not set, used to avoid confusion
3038/// between a value that is set to `None` (because it doesn't exist) and a value
3039/// that has not been cached yet.
3040#[derive(Clone)]
3041enum CachedValue<Value> {
3042 /// A value has been cached.
3043 Cached(Value),
3044 /// Nothing has been cached yet.
3045 NotSet,
3046}
3047
3048impl<Value> CachedValue<Value> {
3049 /// Unwraps the cached value, returning it if it exists.
3050 ///
3051 /// # Panics
3052 ///
3053 /// If the cached value is not set, this will panic.
3054 fn unwrap_cached_value(self) -> Value {
3055 match self {
3056 CachedValue::Cached(value) => value,
3057 CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
3058 }
3059 }
3060
3061 /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
3062 fn as_ref(&self) -> CachedValue<&Value> {
3063 match self {
3064 Self::Cached(value) => CachedValue::Cached(value),
3065 Self::NotSet => CachedValue::NotSet,
3066 }
3067 }
3068
3069 /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3070 /// function to a contained value (if `Cached`) or returns `NotSet` (if
3071 /// `NotSet`).
3072 fn map<Other, F>(self, f: F) -> CachedValue<Other>
3073 where
3074 F: FnOnce(Value) -> Other,
3075 {
3076 match self {
3077 Self::Cached(value) => CachedValue::Cached(f(value)),
3078 Self::NotSet => CachedValue::NotSet,
3079 }
3080 }
3081}
3082
3083/// Information about the state of a room before we joined it.
3084#[derive(Debug, Clone, Default)]
3085struct PreJoinRoomInfo {
3086 /// The user who invited us to the room, if any.
3087 pub inviter: Option<RoomMember>,
3088}
3089
3090// The http mocking library is not supported for wasm32
3091#[cfg(all(test, not(target_family = "wasm")))]
3092pub(crate) mod tests {
3093 use std::{sync::Arc, time::Duration};
3094
3095 use assert_matches::assert_matches;
3096 use assert_matches2::assert_let;
3097 use eyeball::SharedObservable;
3098 use futures_util::{FutureExt, StreamExt, pin_mut};
3099 use js_int::{UInt, uint};
3100 use matrix_sdk_base::{
3101 RoomState,
3102 store::{MemoryStore, StoreConfig},
3103 };
3104 use matrix_sdk_test::{
3105 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3106 event_factory::EventFactory,
3107 };
3108 #[cfg(target_family = "wasm")]
3109 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3110
3111 use ruma::{
3112 RoomId, ServerName, UserId,
3113 api::{
3114 FeatureFlag, MatrixVersion,
3115 client::{
3116 discovery::discover_homeserver::RtcFocusInfo,
3117 room::create_room::v3::Request as CreateRoomRequest,
3118 },
3119 },
3120 assign,
3121 events::{
3122 ignored_user_list::IgnoredUserListEventContent,
3123 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3124 },
3125 owned_room_id, owned_user_id, room_alias_id, room_id,
3126 };
3127 use serde_json::json;
3128 use stream_assert::{assert_next_matches, assert_pending};
3129 use tokio::{
3130 spawn,
3131 time::{sleep, timeout},
3132 };
3133 use url::Url;
3134
3135 use super::Client;
3136 use crate::{
3137 Error, TransmissionProgress,
3138 client::{WeakClient, futures::SendMediaUploadRequest},
3139 config::{RequestConfig, SyncSettings},
3140 futures::SendRequest,
3141 media::MediaError,
3142 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3143 };
3144
3145 #[async_test]
3146 async fn test_account_data() {
3147 let server = MatrixMockServer::new().await;
3148 let client = server.client_builder().build().await;
3149
3150 let f = EventFactory::new();
3151 server
3152 .mock_sync()
3153 .ok_and_run(&client, |builder| {
3154 builder.add_global_account_data(
3155 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3156 );
3157 })
3158 .await;
3159
3160 let content = client
3161 .account()
3162 .account_data::<IgnoredUserListEventContent>()
3163 .await
3164 .unwrap()
3165 .unwrap()
3166 .deserialize()
3167 .unwrap();
3168
3169 assert_eq!(content.ignored_users.len(), 1);
3170 }
3171
3172 #[async_test]
3173 async fn test_successful_discovery() {
3174 // Imagine this is `matrix.org`.
3175 let server = MatrixMockServer::new().await;
3176 let server_url = server.uri();
3177
3178 // Imagine this is `matrix-client.matrix.org`.
3179 let homeserver = MatrixMockServer::new().await;
3180 let homeserver_url = homeserver.uri();
3181
3182 // Imagine Alice has the user ID `@alice:matrix.org`.
3183 let domain = server_url.strip_prefix("http://").unwrap();
3184 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3185
3186 // The `.well-known` is on the server (e.g. `matrix.org`).
3187 server
3188 .mock_well_known()
3189 .ok_with_homeserver_url(&homeserver_url)
3190 .mock_once()
3191 .named("well-known")
3192 .mount()
3193 .await;
3194
3195 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3196 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3197
3198 let client = Client::builder()
3199 .insecure_server_name_no_tls(alice.server_name())
3200 .build()
3201 .await
3202 .unwrap();
3203
3204 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3205 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3206 client.server_versions().await.unwrap();
3207 }
3208
3209 #[async_test]
3210 async fn test_discovery_broken_server() {
3211 let server = MatrixMockServer::new().await;
3212 let server_url = server.uri();
3213 let domain = server_url.strip_prefix("http://").unwrap();
3214 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3215
3216 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3217
3218 assert!(
3219 Client::builder()
3220 .insecure_server_name_no_tls(alice.server_name())
3221 .build()
3222 .await
3223 .is_err(),
3224 "Creating a client from a user ID should fail when the .well-known request fails."
3225 );
3226 }
3227
3228 #[async_test]
3229 async fn test_room_creation() {
3230 let server = MatrixMockServer::new().await;
3231 let client = server.client_builder().build().await;
3232
3233 server
3234 .mock_sync()
3235 .ok_and_run(&client, |builder| {
3236 builder.add_joined_room(
3237 JoinedRoomBuilder::default()
3238 .add_state_event(StateTestEvent::Member)
3239 .add_state_event(StateTestEvent::PowerLevels),
3240 );
3241 })
3242 .await;
3243
3244 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3245 assert_eq!(room.state(), RoomState::Joined);
3246 }
3247
3248 #[async_test]
3249 async fn test_retry_limit_http_requests() {
3250 let server = MatrixMockServer::new().await;
3251 let client = server
3252 .client_builder()
3253 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3254 .build()
3255 .await;
3256
3257 assert!(client.request_config().retry_limit.unwrap() == 3);
3258
3259 server.mock_login().error500().expect(3).mount().await;
3260
3261 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3262 }
3263
3264 #[async_test]
3265 async fn test_retry_timeout_http_requests() {
3266 // Keep this timeout small so that the test doesn't take long
3267 let retry_timeout = Duration::from_secs(5);
3268 let server = MatrixMockServer::new().await;
3269 let client = server
3270 .client_builder()
3271 .on_builder(|builder| {
3272 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3273 })
3274 .build()
3275 .await;
3276
3277 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3278
3279 server.mock_login().error500().expect(2..).mount().await;
3280
3281 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3282 }
3283
3284 #[async_test]
3285 async fn test_short_retry_initial_http_requests() {
3286 let server = MatrixMockServer::new().await;
3287 let client = server
3288 .client_builder()
3289 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3290 .build()
3291 .await;
3292
3293 server.mock_login().error500().expect(3..).mount().await;
3294
3295 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3296 }
3297
3298 #[async_test]
3299 async fn test_no_retry_http_requests() {
3300 let server = MatrixMockServer::new().await;
3301 let client = server.client_builder().build().await;
3302
3303 server.mock_devices().error500().mock_once().mount().await;
3304
3305 client.devices().await.unwrap_err();
3306 }
3307
3308 #[async_test]
3309 async fn test_set_homeserver() {
3310 let client = MockClientBuilder::new(None).build().await;
3311 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3312
3313 let homeserver = Url::parse("http://example.com/").unwrap();
3314 client.set_homeserver(homeserver.clone());
3315 assert_eq!(client.homeserver(), homeserver);
3316 }
3317
3318 #[async_test]
3319 async fn test_search_user_request() {
3320 let server = MatrixMockServer::new().await;
3321 let client = server.client_builder().build().await;
3322
3323 server.mock_user_directory().ok().mock_once().mount().await;
3324
3325 let response = client.search_users("test", 50).await.unwrap();
3326 assert_eq!(response.results.len(), 1);
3327 let result = &response.results[0];
3328 assert_eq!(result.user_id.to_string(), "@test:example.me");
3329 assert_eq!(result.display_name.clone().unwrap(), "Test");
3330 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3331 assert!(!response.limited);
3332 }
3333
3334 #[async_test]
3335 async fn test_request_unstable_features() {
3336 let server = MatrixMockServer::new().await;
3337 let client = server.client_builder().no_server_versions().build().await;
3338
3339 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3340
3341 let unstable_features = client.unstable_features().await.unwrap();
3342 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3343 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3344 }
3345
3346 #[async_test]
3347 async fn test_can_homeserver_push_encrypted_event_to_device() {
3348 let server = MatrixMockServer::new().await;
3349 let client = server.client_builder().no_server_versions().build().await;
3350
3351 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3352
3353 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3354 assert!(msc4028_enabled);
3355 }
3356
3357 #[async_test]
3358 async fn test_recently_visited_rooms() {
3359 // Tracking recently visited rooms requires authentication
3360 let client = MockClientBuilder::new(None).unlogged().build().await;
3361 assert_matches!(
3362 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3363 Err(Error::AuthenticationRequired)
3364 );
3365
3366 let client = MockClientBuilder::new(None).build().await;
3367 let account = client.account();
3368
3369 // We should start off with an empty list
3370 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3371
3372 // Tracking a valid room id should add it to the list
3373 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3374 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3375 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3376
3377 // And the existing list shouldn't be changed
3378 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3379 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3380
3381 // Tracking the same room again shouldn't change the list
3382 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3383 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3384 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3385
3386 // Tracking a second room should add it to the front of the list
3387 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3388 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3389 assert_eq!(
3390 account.get_recently_visited_rooms().await.unwrap(),
3391 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3392 );
3393
3394 // Tracking the first room yet again should move it to the front of the list
3395 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3396 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3397 assert_eq!(
3398 account.get_recently_visited_rooms().await.unwrap(),
3399 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3400 );
3401
3402 // Tracking should be capped at 20
3403 for n in 0..20 {
3404 account
3405 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3406 .await
3407 .unwrap();
3408 }
3409
3410 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3411
3412 // And the initial rooms should've been pushed out
3413 let rooms = account.get_recently_visited_rooms().await.unwrap();
3414 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3415 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3416
3417 // And the last tracked room should be the first
3418 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3419 }
3420
3421 #[async_test]
3422 async fn test_client_no_cycle_with_event_cache() {
3423 let client = MockClientBuilder::new(None).build().await;
3424
3425 // Wait for the init tasks to die.
3426 sleep(Duration::from_secs(1)).await;
3427
3428 let weak_client = WeakClient::from_client(&client);
3429 assert_eq!(weak_client.strong_count(), 1);
3430
3431 {
3432 let room_id = room_id!("!room:example.org");
3433
3434 // Have the client know the room.
3435 let response = SyncResponseBuilder::default()
3436 .add_joined_room(JoinedRoomBuilder::new(room_id))
3437 .build_sync_response();
3438 client.inner.base_client.receive_sync_response(response).await.unwrap();
3439
3440 client.event_cache().subscribe().unwrap();
3441
3442 let (_room_event_cache, _drop_handles) =
3443 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3444 }
3445
3446 drop(client);
3447
3448 // Give a bit of time for background tasks to die.
3449 sleep(Duration::from_secs(1)).await;
3450
3451 // The weak client must be the last reference to the client now.
3452 assert_eq!(weak_client.strong_count(), 0);
3453 let client = weak_client.get();
3454 assert!(
3455 client.is_none(),
3456 "too many strong references to the client: {}",
3457 Arc::strong_count(&client.unwrap().inner)
3458 );
3459 }
3460
3461 #[async_test]
3462 async fn test_server_info_caching() {
3463 let server = MatrixMockServer::new().await;
3464 let server_url = server.uri();
3465 let domain = server_url.strip_prefix("http://").unwrap();
3466 let server_name = <&ServerName>::try_from(domain).unwrap();
3467 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3468
3469 let well_known_mock = server
3470 .mock_well_known()
3471 .ok()
3472 .named("well known mock")
3473 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3474 .mount_as_scoped()
3475 .await;
3476
3477 let versions_mock = server
3478 .mock_versions()
3479 .ok_with_unstable_features()
3480 .named("first versions mock")
3481 .expect(1)
3482 .mount_as_scoped()
3483 .await;
3484
3485 let memory_store = Arc::new(MemoryStore::new());
3486 let client = Client::builder()
3487 .insecure_server_name_no_tls(server_name)
3488 .store_config(
3489 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3490 .state_store(memory_store.clone()),
3491 )
3492 .build()
3493 .await
3494 .unwrap();
3495
3496 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3497
3498 // These subsequent calls hit the in-memory cache.
3499 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3500 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3501
3502 drop(client);
3503
3504 let client = server
3505 .client_builder()
3506 .no_server_versions()
3507 .on_builder(|builder| {
3508 builder.store_config(
3509 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3510 .state_store(memory_store.clone()),
3511 )
3512 })
3513 .build()
3514 .await;
3515
3516 // These calls to the new client hit the on-disk cache.
3517 assert!(
3518 client
3519 .unstable_features()
3520 .await
3521 .unwrap()
3522 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3523 );
3524 let supported = client.supported_versions().await.unwrap();
3525 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3526 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3527
3528 // Then this call hits the in-memory cache.
3529 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3530
3531 drop(versions_mock);
3532 drop(well_known_mock);
3533
3534 // Now, reset the cache, and observe the endpoints being called again once.
3535 client.reset_server_info().await.unwrap();
3536
3537 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3538
3539 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3540
3541 // Hits network again.
3542 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3543 // Hits in-memory cache again.
3544 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3545 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3546 }
3547
3548 #[async_test]
3549 async fn test_server_info_without_a_well_known() {
3550 let server = MatrixMockServer::new().await;
3551 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3552
3553 let versions_mock = server
3554 .mock_versions()
3555 .ok_with_unstable_features()
3556 .named("first versions mock")
3557 .expect(1)
3558 .mount_as_scoped()
3559 .await;
3560
3561 let memory_store = Arc::new(MemoryStore::new());
3562 let client = server
3563 .client_builder()
3564 .no_server_versions()
3565 .on_builder(|builder| {
3566 builder.store_config(
3567 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3568 .state_store(memory_store.clone()),
3569 )
3570 })
3571 .build()
3572 .await;
3573
3574 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3575
3576 // These subsequent calls hit the in-memory cache.
3577 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3578 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3579
3580 drop(client);
3581
3582 let client = server
3583 .client_builder()
3584 .no_server_versions()
3585 .on_builder(|builder| {
3586 builder.store_config(
3587 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3588 .state_store(memory_store.clone()),
3589 )
3590 })
3591 .build()
3592 .await;
3593
3594 // This call to the new client hits the on-disk cache.
3595 assert!(
3596 client
3597 .unstable_features()
3598 .await
3599 .unwrap()
3600 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3601 );
3602
3603 // Then this call hits the in-memory cache.
3604 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3605
3606 drop(versions_mock);
3607
3608 // Now, reset the cache, and observe the endpoints being called again once.
3609 client.reset_server_info().await.unwrap();
3610
3611 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3612
3613 // Hits network again.
3614 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3615 // Hits in-memory cache again.
3616 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3617 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3618 }
3619
3620 #[async_test]
3621 async fn test_no_network_doesnt_cause_infinite_retries() {
3622 // We want infinite retries for transient errors.
3623 let client = MockClientBuilder::new(None)
3624 .on_builder(|builder| builder.request_config(RequestConfig::new()))
3625 .build()
3626 .await;
3627
3628 // We don't define a mock server on purpose here, so that the error is really a
3629 // network error.
3630 client.whoami().await.unwrap_err();
3631 }
3632
3633 #[async_test]
3634 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3635 let server = MatrixMockServer::new().await;
3636 let client = server.client_builder().build().await;
3637
3638 let room_id = room_id!("!room:example.org");
3639
3640 server
3641 .mock_sync()
3642 .ok_and_run(&client, |builder| {
3643 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3644 })
3645 .await;
3646
3647 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3648 assert_eq!(room.room_id(), room_id);
3649 }
3650
3651 #[async_test]
3652 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3653 let server = MatrixMockServer::new().await;
3654 let client = server.client_builder().build().await;
3655
3656 let room_id = room_id!("!room:example.org");
3657
3658 let client = Arc::new(client);
3659
3660 // Perform the /sync request with a delay so it starts after the
3661 // `await_room_remote_echo` call has happened
3662 spawn({
3663 let client = client.clone();
3664 async move {
3665 sleep(Duration::from_millis(100)).await;
3666
3667 server
3668 .mock_sync()
3669 .ok_and_run(&client, |builder| {
3670 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3671 })
3672 .await;
3673 }
3674 });
3675
3676 let room =
3677 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3678 assert_eq!(room.room_id(), room_id);
3679 }
3680
3681 #[async_test]
3682 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3683 let client = MockClientBuilder::new(None).build().await;
3684
3685 let room_id = room_id!("!room:example.org");
3686 // Room is not present so the client won't be able to find it. The call will
3687 // timeout.
3688 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3689 }
3690
3691 #[async_test]
3692 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3693 let server = MatrixMockServer::new().await;
3694 let client = server.client_builder().build().await;
3695
3696 server.mock_create_room().ok().mount().await;
3697
3698 // Create a room in the internal store
3699 let room = client
3700 .create_room(assign!(CreateRoomRequest::new(), {
3701 invite: vec![],
3702 is_direct: false,
3703 }))
3704 .await
3705 .unwrap();
3706
3707 // Room is locally present, but not synced, the call will timeout
3708 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3709 .await
3710 .unwrap_err();
3711 }
3712
3713 #[async_test]
3714 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3715 let server = MatrixMockServer::new().await;
3716 let client = server.client_builder().build().await;
3717
3718 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3719
3720 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3721 assert_matches!(ret, Ok(true));
3722 }
3723
3724 #[async_test]
3725 async fn test_is_room_alias_available_if_alias_is_resolved() {
3726 let server = MatrixMockServer::new().await;
3727 let client = server.client_builder().build().await;
3728
3729 server
3730 .mock_room_directory_resolve_alias()
3731 .ok("!some_room_id:matrix.org", Vec::new())
3732 .expect(1)
3733 .mount()
3734 .await;
3735
3736 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3737 assert_matches!(ret, Ok(false));
3738 }
3739
3740 #[async_test]
3741 async fn test_is_room_alias_available_if_error_found() {
3742 let server = MatrixMockServer::new().await;
3743 let client = server.client_builder().build().await;
3744
3745 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3746
3747 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3748 assert_matches!(ret, Err(_));
3749 }
3750
3751 #[async_test]
3752 async fn test_create_room_alias() {
3753 let server = MatrixMockServer::new().await;
3754 let client = server.client_builder().build().await;
3755
3756 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3757
3758 let ret = client
3759 .create_room_alias(
3760 room_alias_id!("#some_alias:matrix.org"),
3761 room_id!("!some_room:matrix.org"),
3762 )
3763 .await;
3764 assert_matches!(ret, Ok(()));
3765 }
3766
3767 #[async_test]
3768 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3769 let server = MatrixMockServer::new().await;
3770 let client = server.client_builder().build().await;
3771
3772 let room_id = room_id!("!a-room:matrix.org");
3773
3774 // Make sure the summary endpoint is called once
3775 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3776
3777 // We create a locally cached invited room
3778 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3779
3780 // And we get a preview, the server endpoint was reached
3781 let preview = client
3782 .get_room_preview(room_id.into(), Vec::new())
3783 .await
3784 .expect("Room preview should be retrieved");
3785
3786 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3787 }
3788
3789 #[async_test]
3790 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3791 let server = MatrixMockServer::new().await;
3792 let client = server.client_builder().build().await;
3793
3794 let room_id = room_id!("!a-room:matrix.org");
3795
3796 // Make sure the summary endpoint is called once
3797 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3798
3799 // We create a locally cached left room
3800 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3801
3802 // And we get a preview, the server endpoint was reached
3803 let preview = client
3804 .get_room_preview(room_id.into(), Vec::new())
3805 .await
3806 .expect("Room preview should be retrieved");
3807
3808 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3809 }
3810
3811 #[async_test]
3812 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3813 let server = MatrixMockServer::new().await;
3814 let client = server.client_builder().build().await;
3815
3816 let room_id = room_id!("!a-room:matrix.org");
3817
3818 // Make sure the summary endpoint is called once
3819 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3820
3821 // We create a locally cached knocked room
3822 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3823
3824 // And we get a preview, the server endpoint was reached
3825 let preview = client
3826 .get_room_preview(room_id.into(), Vec::new())
3827 .await
3828 .expect("Room preview should be retrieved");
3829
3830 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3831 }
3832
3833 #[async_test]
3834 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3835 let server = MatrixMockServer::new().await;
3836 let client = server.client_builder().build().await;
3837
3838 let room_id = room_id!("!a-room:matrix.org");
3839
3840 // Make sure the summary endpoint is not called
3841 server.mock_room_summary().ok(room_id).never().mount().await;
3842
3843 // We create a locally cached joined room
3844 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3845
3846 // And we get a preview, no server endpoint was reached
3847 let preview = client
3848 .get_room_preview(room_id.into(), Vec::new())
3849 .await
3850 .expect("Room preview should be retrieved");
3851
3852 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3853 }
3854
3855 #[async_test]
3856 async fn test_media_preview_config() {
3857 let server = MatrixMockServer::new().await;
3858 let client = server.client_builder().build().await;
3859
3860 server
3861 .mock_sync()
3862 .ok_and_run(&client, |builder| {
3863 builder.add_custom_global_account_data(json!({
3864 "content": {
3865 "media_previews": "private",
3866 "invite_avatars": "off"
3867 },
3868 "type": "m.media_preview_config"
3869 }));
3870 })
3871 .await;
3872
3873 let (initial_value, stream) =
3874 client.account().observe_media_preview_config().await.unwrap();
3875
3876 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3877 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3878 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3879 pin_mut!(stream);
3880 assert_pending!(stream);
3881
3882 server
3883 .mock_sync()
3884 .ok_and_run(&client, |builder| {
3885 builder.add_custom_global_account_data(json!({
3886 "content": {
3887 "media_previews": "off",
3888 "invite_avatars": "on"
3889 },
3890 "type": "m.media_preview_config"
3891 }));
3892 })
3893 .await;
3894
3895 assert_next_matches!(
3896 stream,
3897 MediaPreviewConfigEventContent {
3898 media_previews: Some(MediaPreviews::Off),
3899 invite_avatars: Some(InviteAvatars::On),
3900 ..
3901 }
3902 );
3903 assert_pending!(stream);
3904 }
3905
3906 #[async_test]
3907 async fn test_unstable_media_preview_config() {
3908 let server = MatrixMockServer::new().await;
3909 let client = server.client_builder().build().await;
3910
3911 server
3912 .mock_sync()
3913 .ok_and_run(&client, |builder| {
3914 builder.add_custom_global_account_data(json!({
3915 "content": {
3916 "media_previews": "private",
3917 "invite_avatars": "off"
3918 },
3919 "type": "io.element.msc4278.media_preview_config"
3920 }));
3921 })
3922 .await;
3923
3924 let (initial_value, stream) =
3925 client.account().observe_media_preview_config().await.unwrap();
3926
3927 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3928 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3929 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3930 pin_mut!(stream);
3931 assert_pending!(stream);
3932
3933 server
3934 .mock_sync()
3935 .ok_and_run(&client, |builder| {
3936 builder.add_custom_global_account_data(json!({
3937 "content": {
3938 "media_previews": "off",
3939 "invite_avatars": "on"
3940 },
3941 "type": "io.element.msc4278.media_preview_config"
3942 }));
3943 })
3944 .await;
3945
3946 assert_next_matches!(
3947 stream,
3948 MediaPreviewConfigEventContent {
3949 media_previews: Some(MediaPreviews::Off),
3950 invite_avatars: Some(InviteAvatars::On),
3951 ..
3952 }
3953 );
3954 assert_pending!(stream);
3955 }
3956
3957 #[async_test]
3958 async fn test_media_preview_config_not_found() {
3959 let server = MatrixMockServer::new().await;
3960 let client = server.client_builder().build().await;
3961
3962 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3963
3964 assert!(initial_value.is_none());
3965 }
3966
3967 #[async_test]
3968 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
3969 // The default Matrix version we use is 1.11 or higher, so authenticated media
3970 // is supported.
3971 let server = MatrixMockServer::new().await;
3972 let client = server.client_builder().build().await;
3973
3974 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3975
3976 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3977 client.load_or_fetch_max_upload_size().await.unwrap();
3978
3979 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3980 }
3981
3982 #[async_test]
3983 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
3984 // The server must advertise support for the stable feature for authenticated
3985 // media support, so we mock the `GET /versions` response.
3986 let server = MatrixMockServer::new().await;
3987 let client = server.client_builder().no_server_versions().build().await;
3988
3989 server
3990 .mock_versions()
3991 .ok_custom(
3992 &["v1.7", "v1.8", "v1.9", "v1.10"],
3993 &[("org.matrix.msc3916.stable", true)].into(),
3994 )
3995 .named("versions")
3996 .expect(1)
3997 .mount()
3998 .await;
3999
4000 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4001
4002 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4003 client.load_or_fetch_max_upload_size().await.unwrap();
4004
4005 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4006 }
4007
4008 #[async_test]
4009 async fn test_load_or_fetch_max_upload_size_no_auth() {
4010 // The server must not support Matrix 1.11 or higher for unauthenticated
4011 // media requests, so we mock the `GET /versions` response.
4012 let server = MatrixMockServer::new().await;
4013 let client = server.client_builder().no_server_versions().build().await;
4014
4015 server
4016 .mock_versions()
4017 .ok_custom(&["v1.1"], &Default::default())
4018 .named("versions")
4019 .expect(1)
4020 .mount()
4021 .await;
4022
4023 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4024
4025 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4026 client.load_or_fetch_max_upload_size().await.unwrap();
4027
4028 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4029 }
4030
4031 #[async_test]
4032 async fn test_uploading_a_too_large_media_file() {
4033 let server = MatrixMockServer::new().await;
4034 let client = server.client_builder().build().await;
4035
4036 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4037 client.load_or_fetch_max_upload_size().await.unwrap();
4038 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4039
4040 let data = vec![1, 2];
4041 let upload_request =
4042 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4043 let request = SendRequest {
4044 client: client.clone(),
4045 request: upload_request,
4046 config: None,
4047 send_progress: SharedObservable::new(TransmissionProgress::default()),
4048 };
4049 let media_request = SendMediaUploadRequest::new(request);
4050
4051 let error = media_request.await.err();
4052 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4053 assert_eq!(max, uint!(1));
4054 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4055 }
4056
4057 #[async_test]
4058 async fn test_dont_ignore_timeout_on_first_sync() {
4059 let server = MatrixMockServer::new().await;
4060 let client = server.client_builder().build().await;
4061
4062 server
4063 .mock_sync()
4064 .timeout(Some(Duration::from_secs(30)))
4065 .ok(|_| {})
4066 .mock_once()
4067 .named("sync_with_timeout")
4068 .mount()
4069 .await;
4070
4071 // Call the endpoint once to check the timeout.
4072 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4073
4074 timeout(Duration::from_secs(1), async {
4075 stream.next().await.unwrap().unwrap();
4076 })
4077 .await
4078 .unwrap();
4079 }
4080
4081 #[async_test]
4082 async fn test_ignore_timeout_on_first_sync() {
4083 let server = MatrixMockServer::new().await;
4084 let client = server.client_builder().build().await;
4085
4086 server
4087 .mock_sync()
4088 .timeout(None)
4089 .ok(|_| {})
4090 .mock_once()
4091 .named("sync_no_timeout")
4092 .mount()
4093 .await;
4094 server
4095 .mock_sync()
4096 .timeout(Some(Duration::from_secs(30)))
4097 .ok(|_| {})
4098 .mock_once()
4099 .named("sync_with_timeout")
4100 .mount()
4101 .await;
4102
4103 // Call each version of the endpoint once to check the timeouts.
4104 let mut stream = Box::pin(
4105 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4106 );
4107
4108 timeout(Duration::from_secs(1), async {
4109 stream.next().await.unwrap().unwrap();
4110 stream.next().await.unwrap().unwrap();
4111 })
4112 .await
4113 .unwrap();
4114 }
4115}