matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{btree_map, BTreeMap, BTreeSet},
19 fmt::{self, Debug},
20 future::{ready, Future},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{store::LockableCryptoStore, DecryptionSettings};
33use matrix_sdk_base::{
34 event_cache::store::EventCacheStoreLock,
35 store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
36 sync::{Notification, RoomUpdates},
37 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
38 StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport,
39};
40use matrix_sdk_common::ttl_cache::TtlCache;
41#[cfg(feature = "e2e-encryption")]
42use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
43use ruma::{
44 api::{
45 client::{
46 account::whoami,
47 alias::{create_alias, delete_alias, get_alias},
48 authenticated_media,
49 device::{delete_devices, get_devices, update_device},
50 directory::{get_public_rooms, get_public_rooms_filtered},
51 discovery::{
52 discover_homeserver::{self, RtcFocusInfo},
53 get_capabilities::{self, v3::Capabilities},
54 get_supported_versions,
55 },
56 error::ErrorKind,
57 filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
58 knock::knock_room,
59 media,
60 membership::{join_room_by_id, join_room_by_id_or_alias},
61 room::create_room,
62 session::login::v3::DiscoveryInfo,
63 sync::sync_events,
64 threads::get_thread_subscriptions_changes,
65 uiaa,
66 user_directory::search_users,
67 },
68 error::FromHttpResponseError,
69 federation::discovery::get_server_version,
70 FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
71 },
72 assign,
73 push::Ruleset,
74 time::Instant,
75 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
76 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
77};
78use serde::de::DeserializeOwned;
79use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
80use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
81use url::Url;
82
83use self::futures::SendRequest;
84use crate::{
85 authentication::{
86 matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
87 SaveSessionCallback,
88 },
89 config::{RequestConfig, SyncToken},
90 deduplicating_handler::DeduplicatingHandler,
91 error::HttpResult,
92 event_cache::EventCache,
93 event_handler::{
94 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
95 EventHandlerStore, ObservableEventHandler, SyncEvent,
96 },
97 http_client::HttpClient,
98 latest_events::LatestEvents,
99 media::MediaError,
100 notification_settings::NotificationSettings,
101 room::RoomMember,
102 room_preview::RoomPreview,
103 send_queue::{SendQueue, SendQueueData},
104 sliding_sync::Version as SlidingSyncVersion,
105 sync::{RoomUpdate, SyncResponse},
106 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
107 Room, SessionTokens, TransmissionProgress,
108};
109#[cfg(feature = "e2e-encryption")]
110use crate::{
111 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
112 store_locks::CrossProcessStoreLock,
113};
114
115mod builder;
116pub(crate) mod caches;
117pub(crate) mod futures;
118#[cfg(feature = "experimental-search")]
119pub(crate) mod search;
120
121pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
122#[cfg(feature = "experimental-search")]
123use crate::client::search::SearchIndex;
124
125#[cfg(not(target_family = "wasm"))]
126type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
127#[cfg(target_family = "wasm")]
128type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
129
130#[cfg(not(target_family = "wasm"))]
131type NotificationHandlerFn =
132 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
133#[cfg(target_family = "wasm")]
134type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
135
136/// Enum controlling if a loop running callbacks should continue or abort.
137///
138/// This is mainly used in the [`sync_with_callback`] method, the return value
139/// of the provided callback controls if the sync loop should be exited.
140///
141/// [`sync_with_callback`]: #method.sync_with_callback
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum LoopCtrl {
144 /// Continue running the loop.
145 Continue,
146 /// Break out of the loop.
147 Break,
148}
149
150/// Represents changes that can occur to a `Client`s `Session`.
151#[derive(Debug, Clone, PartialEq)]
152pub enum SessionChange {
153 /// The session's token is no longer valid.
154 UnknownToken {
155 /// Whether or not the session was soft logged out
156 soft_logout: bool,
157 },
158 /// The session's tokens have been refreshed.
159 TokensRefreshed,
160}
161
162/// Information about the server vendor obtained from the federation API.
163#[derive(Debug, Clone, PartialEq, Eq)]
164#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
165pub struct ServerVendorInfo {
166 /// The server name.
167 pub server_name: String,
168 /// The server version.
169 pub version: String,
170}
171
172/// An async/await enabled Matrix client.
173///
174/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
175#[derive(Clone)]
176pub struct Client {
177 pub(crate) inner: Arc<ClientInner>,
178}
179
180#[derive(Default)]
181pub(crate) struct ClientLocks {
182 /// Lock ensuring that only a single room may be marked as a DM at once.
183 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
184 /// explanation.
185 pub(crate) mark_as_dm_lock: Mutex<()>,
186
187 /// Lock ensuring that only a single secret store is getting opened at the
188 /// same time.
189 ///
190 /// This is important so we don't accidentally create multiple different new
191 /// default secret storage keys.
192 #[cfg(feature = "e2e-encryption")]
193 pub(crate) open_secret_store_lock: Mutex<()>,
194
195 /// Lock ensuring that we're only storing a single secret at a time.
196 ///
197 /// Take a look at the [`SecretStore::put_secret`] method for a more
198 /// detailed explanation.
199 ///
200 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
201 #[cfg(feature = "e2e-encryption")]
202 pub(crate) store_secret_lock: Mutex<()>,
203
204 /// Lock ensuring that only one method at a time might modify our backup.
205 #[cfg(feature = "e2e-encryption")]
206 pub(crate) backup_modify_lock: Mutex<()>,
207
208 /// Lock ensuring that we're going to attempt to upload backups for a single
209 /// requester.
210 #[cfg(feature = "e2e-encryption")]
211 pub(crate) backup_upload_lock: Mutex<()>,
212
213 /// Handler making sure we only have one group session sharing request in
214 /// flight per room.
215 #[cfg(feature = "e2e-encryption")]
216 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
217
218 /// Lock making sure we're only doing one key claim request at a time.
219 #[cfg(feature = "e2e-encryption")]
220 pub(crate) key_claim_lock: Mutex<()>,
221
222 /// Handler to ensure that only one members request is running at a time,
223 /// given a room.
224 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
225
226 /// Handler to ensure that only one encryption state request is running at a
227 /// time, given a room.
228 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
229
230 /// Deduplicating handler for sending read receipts. The string is an
231 /// internal implementation detail, see [`Self::send_single_receipt`].
232 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
233
234 #[cfg(feature = "e2e-encryption")]
235 pub(crate) cross_process_crypto_store_lock:
236 OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
237
238 /// Latest "generation" of data known by the crypto store.
239 ///
240 /// This is a counter that only increments, set in the database (and can
241 /// wrap). It's incremented whenever some process acquires a lock for the
242 /// first time. *This assumes the crypto store lock is being held, to
243 /// avoid data races on writing to this value in the store*.
244 ///
245 /// The current process will maintain this value in local memory and in the
246 /// DB over time. Observing a different value than the one read in
247 /// memory, when reading from the store indicates that somebody else has
248 /// written into the database under our feet.
249 ///
250 /// TODO: this should live in the `OlmMachine`, since it's information
251 /// related to the lock. As of today (2023-07-28), we blow up the entire
252 /// olm machine when there's a generation mismatch. So storing the
253 /// generation in the olm machine would make the client think there's
254 /// *always* a mismatch, and that's why we need to store the generation
255 /// outside the `OlmMachine`.
256 #[cfg(feature = "e2e-encryption")]
257 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
258}
259
260pub(crate) struct ClientInner {
261 /// All the data related to authentication and authorization.
262 pub(crate) auth_ctx: Arc<AuthCtx>,
263
264 /// The URL of the server.
265 ///
266 /// Not to be confused with the `Self::homeserver`. `server` is usually
267 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
268 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
269 /// homeserver (at the time of writing — 2024-08-28).
270 ///
271 /// This value is optional depending on how the `Client` has been built.
272 /// If it's been built from a homeserver URL directly, we don't know the
273 /// server. However, if the `Client` has been built from a server URL or
274 /// name, then the homeserver has been discovered, and we know both.
275 server: Option<Url>,
276
277 /// The URL of the homeserver to connect to.
278 ///
279 /// This is the URL for the client-server Matrix API.
280 homeserver: StdRwLock<Url>,
281
282 /// The sliding sync version.
283 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
284
285 /// The underlying HTTP client.
286 pub(crate) http_client: HttpClient,
287
288 /// User session data.
289 pub(super) base_client: BaseClient,
290
291 /// Collection of in-memory caches for the [`Client`].
292 pub(crate) caches: ClientCaches,
293
294 /// Collection of locks individual client methods might want to use, either
295 /// to ensure that only a single call to a method happens at once or to
296 /// deduplicate multiple calls to a method.
297 pub(crate) locks: ClientLocks,
298
299 /// The cross-process store locks holder name.
300 ///
301 /// The SDK provides cross-process store locks (see
302 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
303 /// `holder_name` is the value used for all cross-process store locks
304 /// used by this `Client`.
305 ///
306 /// If multiple `Client`s are running in different processes, this
307 /// value MUST be different for each `Client`.
308 cross_process_store_locks_holder_name: String,
309
310 /// A mapping of the times at which the current user sent typing notices,
311 /// keyed by room.
312 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
313
314 /// Event handlers. See `add_event_handler`.
315 pub(crate) event_handlers: EventHandlerStore,
316
317 /// Notification handlers. See `register_notification_handler`.
318 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
319
320 /// The sender-side of channels used to receive room updates.
321 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
322
323 /// The sender-side of a channel used to observe all the room updates of a
324 /// sync response.
325 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
326
327 /// Whether the client should update its homeserver URL with the discovery
328 /// information present in the login response.
329 respect_login_well_known: bool,
330
331 /// An event that can be listened on to wait for a successful sync. The
332 /// event will only be fired if a sync loop is running. Can be used for
333 /// synchronization, e.g. if we send out a request to create a room, we can
334 /// wait for the sync to get the data to fetch a room object from the state
335 /// store.
336 pub(crate) sync_beat: event_listener::Event,
337
338 /// A central cache for events, inactive first.
339 ///
340 /// It becomes active when [`EventCache::subscribe`] is called.
341 pub(crate) event_cache: OnceCell<EventCache>,
342
343 /// End-to-end encryption related state.
344 #[cfg(feature = "e2e-encryption")]
345 pub(crate) e2ee: EncryptionData,
346
347 /// The verification state of our own device.
348 #[cfg(feature = "e2e-encryption")]
349 pub(crate) verification_state: SharedObservable<VerificationState>,
350
351 /// Whether to enable the experimental support for sending and receiving
352 /// encrypted room history on invite, per [MSC4268].
353 ///
354 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
355 #[cfg(feature = "e2e-encryption")]
356 pub(crate) enable_share_history_on_invite: bool,
357
358 /// Data related to the [`SendQueue`].
359 ///
360 /// [`SendQueue`]: crate::send_queue::SendQueue
361 pub(crate) send_queue_data: Arc<SendQueueData>,
362
363 /// The `max_upload_size` value of the homeserver, it contains the max
364 /// request size you can send.
365 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
366
367 /// The entry point to get the [`LatestEvent`] of rooms and threads.
368 ///
369 /// [`LatestEvent`]: crate::latest_event::LatestEvent
370 latest_events: OnceCell<LatestEvents>,
371
372 #[cfg(feature = "experimental-search")]
373 /// Handler for [`RoomIndex`]'s of each room
374 search_index: SearchIndex,
375}
376
377impl ClientInner {
378 /// Create a new `ClientInner`.
379 ///
380 /// All the fields passed as parameters here are those that must be cloned
381 /// upon instantiation of a sub-client, e.g. a client specialized for
382 /// notifications.
383 #[allow(clippy::too_many_arguments)]
384 async fn new(
385 auth_ctx: Arc<AuthCtx>,
386 server: Option<Url>,
387 homeserver: Url,
388 sliding_sync_version: SlidingSyncVersion,
389 http_client: HttpClient,
390 base_client: BaseClient,
391 server_info: ClientServerInfo,
392 respect_login_well_known: bool,
393 event_cache: OnceCell<EventCache>,
394 send_queue: Arc<SendQueueData>,
395 latest_events: OnceCell<LatestEvents>,
396 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
397 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
398 cross_process_store_locks_holder_name: String,
399 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
400 ) -> Arc<Self> {
401 let caches = ClientCaches {
402 server_info: server_info.into(),
403 server_metadata: Mutex::new(TtlCache::new()),
404 };
405
406 let client = Self {
407 server,
408 homeserver: StdRwLock::new(homeserver),
409 auth_ctx,
410 sliding_sync_version: StdRwLock::new(sliding_sync_version),
411 http_client,
412 base_client,
413 caches,
414 locks: Default::default(),
415 cross_process_store_locks_holder_name,
416 typing_notice_times: Default::default(),
417 event_handlers: Default::default(),
418 notification_handlers: Default::default(),
419 room_update_channels: Default::default(),
420 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
421 // ballast for all observers to catch up.
422 room_updates_sender: broadcast::Sender::new(32),
423 respect_login_well_known,
424 sync_beat: event_listener::Event::new(),
425 event_cache,
426 send_queue_data: send_queue,
427 latest_events,
428 #[cfg(feature = "e2e-encryption")]
429 e2ee: EncryptionData::new(encryption_settings),
430 #[cfg(feature = "e2e-encryption")]
431 verification_state: SharedObservable::new(VerificationState::Unknown),
432 #[cfg(feature = "e2e-encryption")]
433 enable_share_history_on_invite,
434 server_max_upload_size: Mutex::new(OnceCell::new()),
435 #[cfg(feature = "experimental-search")]
436 search_index: search_index_handler,
437 };
438
439 #[allow(clippy::let_and_return)]
440 let client = Arc::new(client);
441
442 #[cfg(feature = "e2e-encryption")]
443 client.e2ee.initialize_tasks(&client);
444
445 let _ = client
446 .event_cache
447 .get_or_init(|| async {
448 EventCache::new(
449 WeakClient::from_inner(&client),
450 client.base_client.event_cache_store().clone(),
451 )
452 })
453 .await;
454
455 client
456 }
457}
458
459#[cfg(not(tarpaulin_include))]
460impl Debug for Client {
461 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
462 write!(fmt, "Client")
463 }
464}
465
466impl Client {
467 /// Create a new [`Client`] that will use the given homeserver.
468 ///
469 /// # Arguments
470 ///
471 /// * `homeserver_url` - The homeserver that the client should connect to.
472 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
473 Self::builder().homeserver_url(homeserver_url).build().await
474 }
475
476 /// Returns a subscriber that publishes an event every time the ignore user
477 /// list changes.
478 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
479 self.inner.base_client.subscribe_to_ignore_user_list_changes()
480 }
481
482 /// Create a new [`ClientBuilder`].
483 pub fn builder() -> ClientBuilder {
484 ClientBuilder::new()
485 }
486
487 pub(crate) fn base_client(&self) -> &BaseClient {
488 &self.inner.base_client
489 }
490
491 /// The underlying HTTP client.
492 pub fn http_client(&self) -> &reqwest::Client {
493 &self.inner.http_client.inner
494 }
495
496 pub(crate) fn locks(&self) -> &ClientLocks {
497 &self.inner.locks
498 }
499
500 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
501 &self.inner.auth_ctx
502 }
503
504 /// The cross-process store locks holder name.
505 ///
506 /// The SDK provides cross-process store locks (see
507 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
508 /// `holder_name` is the value used for all cross-process store locks
509 /// used by this `Client`.
510 pub fn cross_process_store_locks_holder_name(&self) -> &str {
511 &self.inner.cross_process_store_locks_holder_name
512 }
513
514 /// Change the homeserver URL used by this client.
515 ///
516 /// # Arguments
517 ///
518 /// * `homeserver_url` - The new URL to use.
519 fn set_homeserver(&self, homeserver_url: Url) {
520 *self.inner.homeserver.write().unwrap() = homeserver_url;
521 }
522
523 /// Get the capabilities of the homeserver.
524 ///
525 /// This method should be used to check what features are supported by the
526 /// homeserver.
527 ///
528 /// # Examples
529 ///
530 /// ```no_run
531 /// # use matrix_sdk::Client;
532 /// # use url::Url;
533 /// # async {
534 /// # let homeserver = Url::parse("http://example.com")?;
535 /// let client = Client::new(homeserver).await?;
536 ///
537 /// let capabilities = client.get_capabilities().await?;
538 ///
539 /// if capabilities.change_password.enabled {
540 /// // Change password
541 /// }
542 /// # anyhow::Ok(()) };
543 /// ```
544 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
545 let res = self.send(get_capabilities::v3::Request::new()).await?;
546 Ok(res.capabilities)
547 }
548
549 /// Get the server vendor information from the federation API.
550 ///
551 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
552 /// both the server's software name and version.
553 ///
554 /// # Examples
555 ///
556 /// ```no_run
557 /// # use matrix_sdk::Client;
558 /// # use url::Url;
559 /// # async {
560 /// # let homeserver = Url::parse("http://example.com")?;
561 /// let client = Client::new(homeserver).await?;
562 ///
563 /// let server_info = client.server_vendor_info().await?;
564 /// println!(
565 /// "Server: {}, Version: {}",
566 /// server_info.server_name, server_info.version
567 /// );
568 /// # anyhow::Ok(()) };
569 /// ```
570 pub async fn server_vendor_info(&self) -> HttpResult<ServerVendorInfo> {
571 let res = self.send(get_server_version::v1::Request::new()).await?;
572
573 // Extract server info, using defaults if fields are missing.
574 let server = res.server.unwrap_or_default();
575 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
576 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
577
578 Ok(ServerVendorInfo { server_name: server_name_str, version })
579 }
580
581 /// Get a copy of the default request config.
582 ///
583 /// The default request config is what's used when sending requests if no
584 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
585 /// function with such a parameter.
586 ///
587 /// If the default request config was not customized through
588 /// [`ClientBuilder`] when creating this `Client`, the returned value will
589 /// be equivalent to [`RequestConfig::default()`].
590 pub fn request_config(&self) -> RequestConfig {
591 self.inner.http_client.request_config
592 }
593
594 /// Check whether the client has been activated.
595 ///
596 /// A client is considered active when:
597 ///
598 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
599 /// is logged in,
600 /// 2. Has loaded cached data from storage,
601 /// 3. If encryption is enabled, it also initialized or restored its
602 /// `OlmMachine`.
603 pub fn is_active(&self) -> bool {
604 self.inner.base_client.is_active()
605 }
606
607 /// The server used by the client.
608 ///
609 /// See `Self::server` to learn more.
610 pub fn server(&self) -> Option<&Url> {
611 self.inner.server.as_ref()
612 }
613
614 /// The homeserver of the client.
615 pub fn homeserver(&self) -> Url {
616 self.inner.homeserver.read().unwrap().clone()
617 }
618
619 /// Get the sliding sync version.
620 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
621 self.inner.sliding_sync_version.read().unwrap().clone()
622 }
623
624 /// Override the sliding sync version.
625 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
626 let mut lock = self.inner.sliding_sync_version.write().unwrap();
627 *lock = version;
628 }
629
630 /// Get the Matrix user session meta information.
631 ///
632 /// If the client is currently logged in, this will return a
633 /// [`SessionMeta`] object which contains the user ID and device ID.
634 /// Otherwise it returns `None`.
635 pub fn session_meta(&self) -> Option<&SessionMeta> {
636 self.base_client().session_meta()
637 }
638
639 /// Returns a receiver that gets events for each room info update. To watch
640 /// for new events, use `receiver.resubscribe()`.
641 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
642 self.base_client().room_info_notable_update_receiver()
643 }
644
645 /// Performs a search for users.
646 /// The search is performed case-insensitively on user IDs and display names
647 ///
648 /// # Arguments
649 ///
650 /// * `search_term` - The search term for the search
651 /// * `limit` - The maximum number of results to return. Defaults to 10.
652 ///
653 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
654 pub async fn search_users(
655 &self,
656 search_term: &str,
657 limit: u64,
658 ) -> HttpResult<search_users::v3::Response> {
659 let mut request = search_users::v3::Request::new(search_term.to_owned());
660
661 if let Some(limit) = UInt::new(limit) {
662 request.limit = limit;
663 }
664
665 self.send(request).await
666 }
667
668 /// Get the user id of the current owner of the client.
669 pub fn user_id(&self) -> Option<&UserId> {
670 self.session_meta().map(|s| s.user_id.as_ref())
671 }
672
673 /// Get the device ID that identifies the current session.
674 pub fn device_id(&self) -> Option<&DeviceId> {
675 self.session_meta().map(|s| s.device_id.as_ref())
676 }
677
678 /// Get the current access token for this session.
679 ///
680 /// Will be `None` if the client has not been logged in.
681 pub fn access_token(&self) -> Option<String> {
682 self.auth_ctx().access_token()
683 }
684
685 /// Get the current tokens for this session.
686 ///
687 /// To be notified of changes in the session tokens, use
688 /// [`Client::subscribe_to_session_changes()`] or
689 /// [`Client::set_session_callbacks()`].
690 ///
691 /// Returns `None` if the client has not been logged in.
692 pub fn session_tokens(&self) -> Option<SessionTokens> {
693 self.auth_ctx().session_tokens()
694 }
695
696 /// Access the authentication API used to log in this client.
697 ///
698 /// Will be `None` if the client has not been logged in.
699 pub fn auth_api(&self) -> Option<AuthApi> {
700 match self.auth_ctx().auth_data.get()? {
701 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
702 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
703 }
704 }
705
706 /// Get the whole session info of this client.
707 ///
708 /// Will be `None` if the client has not been logged in.
709 ///
710 /// Can be used with [`Client::restore_session`] to restore a previously
711 /// logged-in session.
712 pub fn session(&self) -> Option<AuthSession> {
713 match self.auth_api()? {
714 AuthApi::Matrix(api) => api.session().map(Into::into),
715 AuthApi::OAuth(api) => api.full_session().map(Into::into),
716 }
717 }
718
719 /// Get a reference to the state store.
720 pub fn state_store(&self) -> &DynStateStore {
721 self.base_client().state_store()
722 }
723
724 /// Get a reference to the event cache store.
725 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
726 self.base_client().event_cache_store()
727 }
728
729 /// Access the native Matrix authentication API with this client.
730 pub fn matrix_auth(&self) -> MatrixAuth {
731 MatrixAuth::new(self.clone())
732 }
733
734 /// Get the account of the current owner of the client.
735 pub fn account(&self) -> Account {
736 Account::new(self.clone())
737 }
738
739 /// Get the encryption manager of the client.
740 #[cfg(feature = "e2e-encryption")]
741 pub fn encryption(&self) -> Encryption {
742 Encryption::new(self.clone())
743 }
744
745 /// Get the media manager of the client.
746 pub fn media(&self) -> Media {
747 Media::new(self.clone())
748 }
749
750 /// Get the pusher manager of the client.
751 pub fn pusher(&self) -> Pusher {
752 Pusher::new(self.clone())
753 }
754
755 /// Access the OAuth 2.0 API of the client.
756 pub fn oauth(&self) -> OAuth {
757 OAuth::new(self.clone())
758 }
759
760 /// Register a handler for a specific event type.
761 ///
762 /// The handler is a function or closure with one or more arguments. The
763 /// first argument is the event itself. All additional arguments are
764 /// "context" arguments: They have to implement [`EventHandlerContext`].
765 /// This trait is named that way because most of the types implementing it
766 /// give additional context about an event: The room it was in, its raw form
767 /// and other similar things. As two exceptions to this,
768 /// [`Client`] and [`EventHandlerHandle`] also implement the
769 /// `EventHandlerContext` trait so you don't have to clone your client
770 /// into the event handler manually and a handler can decide to remove
771 /// itself.
772 ///
773 /// Some context arguments are not universally applicable. A context
774 /// argument that isn't available for the given event type will result in
775 /// the event handler being skipped and an error being logged. The following
776 /// context argument types are only available for a subset of event types:
777 ///
778 /// * [`Room`] is only available for room-specific events, i.e. not for
779 /// events like global account data events or presence events.
780 ///
781 /// You can provide custom context via
782 /// [`add_event_handler_context`](Client::add_event_handler_context) and
783 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
784 /// into the event handler.
785 ///
786 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
787 ///
788 /// # Examples
789 ///
790 /// ```no_run
791 /// use matrix_sdk::{
792 /// deserialized_responses::EncryptionInfo,
793 /// event_handler::Ctx,
794 /// ruma::{
795 /// events::{
796 /// macros::EventContent,
797 /// push_rules::PushRulesEvent,
798 /// room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
799 /// },
800 /// push::Action,
801 /// Int, MilliSecondsSinceUnixEpoch,
802 /// },
803 /// Client, Room,
804 /// };
805 /// use serde::{Deserialize, Serialize};
806 ///
807 /// # async fn example(client: Client) {
808 /// client.add_event_handler(
809 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
810 /// // Common usage: Room event plus room and client.
811 /// },
812 /// );
813 /// client.add_event_handler(
814 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
815 /// async move {
816 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
817 /// // unencrypted events and events that were decrypted by the SDK.
818 /// }
819 /// },
820 /// );
821 /// client.add_event_handler(
822 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
823 /// async move {
824 /// // A `Vec<Action>` parameter allows you to know which push actions
825 /// // are applicable for an event. For example, an event with
826 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
827 /// // in the timeline.
828 /// }
829 /// },
830 /// );
831 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
832 /// // You can omit any or all arguments after the first.
833 /// });
834 ///
835 /// // Registering a temporary event handler:
836 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
837 /// /* Event handler */
838 /// });
839 /// client.remove_event_handler(handle);
840 ///
841 /// // Registering custom event handler context:
842 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
843 /// struct MyContext {
844 /// number: usize,
845 /// }
846 /// client.add_event_handler_context(MyContext { number: 5 });
847 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
848 /// // Use the context
849 /// });
850 ///
851 /// // Custom events work exactly the same way, you just need to declare
852 /// // the content struct and use the EventContent derive macro on it.
853 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
854 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
855 /// struct TokenEventContent {
856 /// token: String,
857 /// #[serde(rename = "exp")]
858 /// expires_at: MilliSecondsSinceUnixEpoch,
859 /// }
860 ///
861 /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
862 /// todo!("Display the token");
863 /// });
864 ///
865 /// // Event handler closures can also capture local variables.
866 /// // Make sure they are cheap to clone though, because they will be cloned
867 /// // every time the closure is called.
868 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
869 ///
870 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
871 /// println!("Calling the handler with identifier {data}");
872 /// });
873 /// # }
874 /// ```
875 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
876 where
877 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
878 H: EventHandler<Ev, Ctx>,
879 {
880 self.add_event_handler_impl(handler, None)
881 }
882
883 /// Register a handler for a specific room, and event type.
884 ///
885 /// This method works the same way as
886 /// [`add_event_handler`][Self::add_event_handler], except that the handler
887 /// will only be called for events in the room with the specified ID. See
888 /// that method for more details on event handler functions.
889 ///
890 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
891 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
892 /// your use case.
893 pub fn add_room_event_handler<Ev, Ctx, H>(
894 &self,
895 room_id: &RoomId,
896 handler: H,
897 ) -> EventHandlerHandle
898 where
899 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
900 H: EventHandler<Ev, Ctx>,
901 {
902 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
903 }
904
905 /// Observe a specific event type.
906 ///
907 /// `Ev` represents the kind of event that will be observed. `Ctx`
908 /// represents the context that will come with the event. It relies on the
909 /// same mechanism as [`Client::add_event_handler`]. The main difference is
910 /// that it returns an [`ObservableEventHandler`] and doesn't require a
911 /// user-defined closure. It is possible to subscribe to the
912 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
913 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
914 /// Ctx)`.
915 ///
916 /// Be careful that only the most recent value can be observed. Subscribers
917 /// are notified when a new value is sent, but there is no guarantee
918 /// that they will see all values.
919 ///
920 /// # Example
921 ///
922 /// Let's see a classical usage:
923 ///
924 /// ```
925 /// use futures_util::StreamExt as _;
926 /// use matrix_sdk::{
927 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
928 /// Client, Room,
929 /// };
930 ///
931 /// # async fn example(client: Client) -> Option<()> {
932 /// let observer =
933 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
934 ///
935 /// let mut subscriber = observer.subscribe();
936 ///
937 /// let (event, (room, push_actions)) = subscriber.next().await?;
938 /// # Some(())
939 /// # }
940 /// ```
941 ///
942 /// Now let's see how to get several contexts that can be useful for you:
943 ///
944 /// ```
945 /// use matrix_sdk::{
946 /// deserialized_responses::EncryptionInfo,
947 /// ruma::{
948 /// events::room::{
949 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
950 /// },
951 /// push::Action,
952 /// },
953 /// Client, Room,
954 /// };
955 ///
956 /// # async fn example(client: Client) {
957 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
958 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
959 ///
960 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
961 /// // to distinguish between unencrypted events and events that were decrypted
962 /// // by the SDK.
963 /// let _ = client
964 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
965 /// );
966 ///
967 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
968 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
969 /// // should be highlighted in the timeline.
970 /// let _ =
971 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
972 ///
973 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
974 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
975 /// # }
976 /// ```
977 ///
978 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
979 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
980 where
981 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
982 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
983 {
984 self.observe_room_events_impl(None)
985 }
986
987 /// Observe a specific room, and event type.
988 ///
989 /// This method works the same way as [`Client::observe_events`], except
990 /// that the observability will only be applied for events in the room with
991 /// the specified ID. See that method for more details.
992 ///
993 /// Be careful that only the most recent value can be observed. Subscribers
994 /// are notified when a new value is sent, but there is no guarantee
995 /// that they will see all values.
996 pub fn observe_room_events<Ev, Ctx>(
997 &self,
998 room_id: &RoomId,
999 ) -> ObservableEventHandler<(Ev, Ctx)>
1000 where
1001 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1002 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1003 {
1004 self.observe_room_events_impl(Some(room_id.to_owned()))
1005 }
1006
1007 /// Shared implementation for `Client::observe_events` and
1008 /// `Client::observe_room_events`.
1009 fn observe_room_events_impl<Ev, Ctx>(
1010 &self,
1011 room_id: Option<OwnedRoomId>,
1012 ) -> ObservableEventHandler<(Ev, Ctx)>
1013 where
1014 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1015 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1016 {
1017 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1018 // new value.
1019 let shared_observable = SharedObservable::new(None);
1020
1021 ObservableEventHandler::new(
1022 shared_observable.clone(),
1023 self.event_handler_drop_guard(self.add_event_handler_impl(
1024 move |event: Ev, context: Ctx| {
1025 shared_observable.set(Some((event, context)));
1026
1027 ready(())
1028 },
1029 room_id,
1030 )),
1031 )
1032 }
1033
1034 /// Remove the event handler associated with the handle.
1035 ///
1036 /// Note that you **must not** call `remove_event_handler` from the
1037 /// non-async part of an event handler, that is:
1038 ///
1039 /// ```ignore
1040 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1041 /// // ⚠ this will cause a deadlock ⚠
1042 /// client.remove_event_handler(handle);
1043 ///
1044 /// async move {
1045 /// // removing the event handler here is fine
1046 /// client.remove_event_handler(handle);
1047 /// }
1048 /// })
1049 /// ```
1050 ///
1051 /// Note also that handlers that remove themselves will still execute with
1052 /// events received in the same sync cycle.
1053 ///
1054 /// # Arguments
1055 ///
1056 /// `handle` - The [`EventHandlerHandle`] that is returned when
1057 /// registering the event handler with [`Client::add_event_handler`].
1058 ///
1059 /// # Examples
1060 ///
1061 /// ```no_run
1062 /// # use url::Url;
1063 /// # use tokio::sync::mpsc;
1064 /// #
1065 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1066 /// #
1067 /// use matrix_sdk::{
1068 /// event_handler::EventHandlerHandle,
1069 /// ruma::events::room::member::SyncRoomMemberEvent, Client,
1070 /// };
1071 /// #
1072 /// # futures_executor::block_on(async {
1073 /// # let client = matrix_sdk::Client::builder()
1074 /// # .homeserver_url(homeserver)
1075 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1076 /// # .build()
1077 /// # .await
1078 /// # .unwrap();
1079 ///
1080 /// client.add_event_handler(
1081 /// |ev: SyncRoomMemberEvent,
1082 /// client: Client,
1083 /// handle: EventHandlerHandle| async move {
1084 /// // Common usage: Check arriving Event is the expected one
1085 /// println!("Expected RoomMemberEvent received!");
1086 /// client.remove_event_handler(handle);
1087 /// },
1088 /// );
1089 /// # });
1090 /// ```
1091 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1092 self.inner.event_handlers.remove(handle);
1093 }
1094
1095 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1096 /// the given handle.
1097 ///
1098 /// When the returned value is dropped, the event handler will be removed.
1099 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1100 EventHandlerDropGuard::new(handle, self.clone())
1101 }
1102
1103 /// Add an arbitrary value for use as event handler context.
1104 ///
1105 /// The value can be obtained in an event handler by adding an argument of
1106 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1107 ///
1108 /// If a value of the same type has been added before, it will be
1109 /// overwritten.
1110 ///
1111 /// # Examples
1112 ///
1113 /// ```no_run
1114 /// use matrix_sdk::{
1115 /// event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1116 /// Room,
1117 /// };
1118 /// # #[derive(Clone)]
1119 /// # struct SomeType;
1120 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1121 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1122 /// # futures_executor::block_on(async {
1123 /// # let client = matrix_sdk::Client::builder()
1124 /// # .homeserver_url(homeserver)
1125 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1126 /// # .build()
1127 /// # .await
1128 /// # .unwrap();
1129 ///
1130 /// // Handle used to send messages to the UI part of the app
1131 /// let my_gui_handle: SomeType = obtain_gui_handle();
1132 ///
1133 /// client.add_event_handler_context(my_gui_handle.clone());
1134 /// client.add_event_handler(
1135 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1136 /// async move {
1137 /// // gui_handle.send(DisplayMessage { message: ev });
1138 /// }
1139 /// },
1140 /// );
1141 /// # });
1142 /// ```
1143 pub fn add_event_handler_context<T>(&self, ctx: T)
1144 where
1145 T: Clone + Send + Sync + 'static,
1146 {
1147 self.inner.event_handlers.add_context(ctx);
1148 }
1149
1150 /// Register a handler for a notification.
1151 ///
1152 /// Similar to [`Client::add_event_handler`], but only allows functions
1153 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1154 /// [`Client`] for now.
1155 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1156 where
1157 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1158 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1159 {
1160 self.inner.notification_handlers.write().await.push(Box::new(
1161 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1162 ));
1163
1164 self
1165 }
1166
1167 /// Subscribe to all updates for the room with the given ID.
1168 ///
1169 /// The returned receiver will receive a new message for each sync response
1170 /// that contains updates for that room.
1171 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1172 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1173 btree_map::Entry::Vacant(entry) => {
1174 let (tx, rx) = broadcast::channel(8);
1175 entry.insert(tx);
1176 rx
1177 }
1178 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1179 }
1180 }
1181
1182 /// Subscribe to all updates to all rooms, whenever any has been received in
1183 /// a sync response.
1184 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1185 self.inner.room_updates_sender.subscribe()
1186 }
1187
1188 pub(crate) async fn notification_handlers(
1189 &self,
1190 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1191 self.inner.notification_handlers.read().await
1192 }
1193
1194 /// Get all the rooms the client knows about.
1195 ///
1196 /// This will return the list of joined, invited, and left rooms.
1197 pub fn rooms(&self) -> Vec<Room> {
1198 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1199 }
1200
1201 /// Get all the rooms the client knows about, filtered by room state.
1202 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1203 self.base_client()
1204 .rooms_filtered(filter)
1205 .into_iter()
1206 .map(|room| Room::new(self.clone(), room))
1207 .collect()
1208 }
1209
1210 /// Get a stream of all the rooms, in addition to the existing rooms.
1211 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1212 let (rooms, stream) = self.base_client().rooms_stream();
1213
1214 let map_room = |room| Room::new(self.clone(), room);
1215
1216 (
1217 rooms.into_iter().map(map_room).collect(),
1218 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1219 )
1220 }
1221
1222 /// Returns the joined rooms this client knows about.
1223 pub fn joined_rooms(&self) -> Vec<Room> {
1224 self.rooms_filtered(RoomStateFilter::JOINED)
1225 }
1226
1227 /// Returns the invited rooms this client knows about.
1228 pub fn invited_rooms(&self) -> Vec<Room> {
1229 self.rooms_filtered(RoomStateFilter::INVITED)
1230 }
1231
1232 /// Returns the left rooms this client knows about.
1233 pub fn left_rooms(&self) -> Vec<Room> {
1234 self.rooms_filtered(RoomStateFilter::LEFT)
1235 }
1236
1237 /// Returns the joined space rooms this client knows about.
1238 pub fn joined_space_rooms(&self) -> Vec<Room> {
1239 self.base_client()
1240 .rooms_filtered(RoomStateFilter::JOINED)
1241 .into_iter()
1242 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1243 .collect()
1244 }
1245
1246 /// Get a room with the given room id.
1247 ///
1248 /// # Arguments
1249 ///
1250 /// `room_id` - The unique id of the room that should be fetched.
1251 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1252 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1253 }
1254
1255 /// Gets the preview of a room, whether the current user has joined it or
1256 /// not.
1257 pub async fn get_room_preview(
1258 &self,
1259 room_or_alias_id: &RoomOrAliasId,
1260 via: Vec<OwnedServerName>,
1261 ) -> Result<RoomPreview> {
1262 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1263 Ok(room_id) => room_id.to_owned(),
1264 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1265 };
1266
1267 if let Some(room) = self.get_room(&room_id) {
1268 // The cached data can only be trusted if the room state is joined or
1269 // banned: for invite and knock rooms, no updates will be received
1270 // for the rooms after the invite/knock action took place so we may
1271 // have very out to date data for important fields such as
1272 // `join_rule`. For left rooms, the homeserver should return the latest info.
1273 match room.state() {
1274 RoomState::Joined | RoomState::Banned => {
1275 return Ok(RoomPreview::from_known_room(&room).await);
1276 }
1277 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1278 }
1279 }
1280
1281 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1282 }
1283
1284 /// Resolve a room alias to a room id and a list of servers which know
1285 /// about it.
1286 ///
1287 /// # Arguments
1288 ///
1289 /// `room_alias` - The room alias to be resolved.
1290 pub async fn resolve_room_alias(
1291 &self,
1292 room_alias: &RoomAliasId,
1293 ) -> HttpResult<get_alias::v3::Response> {
1294 let request = get_alias::v3::Request::new(room_alias.to_owned());
1295 self.send(request).await
1296 }
1297
1298 /// Checks if a room alias is not in use yet.
1299 ///
1300 /// Returns:
1301 /// - `Ok(true)` if the room alias is available.
1302 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1303 /// status code).
1304 /// - An `Err` otherwise.
1305 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1306 match self.resolve_room_alias(alias).await {
1307 // The room alias was resolved, so it's already in use.
1308 Ok(_) => Ok(false),
1309 Err(error) => {
1310 match error.client_api_error_kind() {
1311 // The room alias wasn't found, so it's available.
1312 Some(ErrorKind::NotFound) => Ok(true),
1313 _ => Err(error),
1314 }
1315 }
1316 }
1317 }
1318
1319 /// Adds a new room alias associated with a room to the room directory.
1320 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1321 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1322 self.send(request).await?;
1323 Ok(())
1324 }
1325
1326 /// Removes a room alias from the room directory.
1327 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1328 let request = delete_alias::v3::Request::new(alias.to_owned());
1329 self.send(request).await?;
1330 Ok(())
1331 }
1332
1333 /// Update the homeserver from the login response well-known if needed.
1334 ///
1335 /// # Arguments
1336 ///
1337 /// * `login_well_known` - The `well_known` field from a successful login
1338 /// response.
1339 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1340 if self.inner.respect_login_well_known {
1341 if let Some(well_known) = login_well_known {
1342 if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1343 self.set_homeserver(homeserver);
1344 }
1345 }
1346 }
1347 }
1348
1349 /// Similar to [`Client::restore_session_with`], with
1350 /// [`RoomLoadSettings::default()`].
1351 ///
1352 /// # Panics
1353 ///
1354 /// Panics if a session was already restored or logged in.
1355 #[instrument(skip_all)]
1356 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1357 self.restore_session_with(session, RoomLoadSettings::default()).await
1358 }
1359
1360 /// Restore a session previously logged-in using one of the available
1361 /// authentication APIs. The number of rooms to restore is controlled by
1362 /// [`RoomLoadSettings`].
1363 ///
1364 /// See the documentation of the corresponding authentication API's
1365 /// `restore_session` method for more information.
1366 ///
1367 /// # Panics
1368 ///
1369 /// Panics if a session was already restored or logged in.
1370 #[instrument(skip_all)]
1371 pub async fn restore_session_with(
1372 &self,
1373 session: impl Into<AuthSession>,
1374 room_load_settings: RoomLoadSettings,
1375 ) -> Result<()> {
1376 let session = session.into();
1377 match session {
1378 AuthSession::Matrix(session) => {
1379 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1380 }
1381 AuthSession::OAuth(session) => {
1382 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1383 }
1384 }
1385 }
1386
1387 /// Refresh the access token using the authentication API used to log into
1388 /// this session.
1389 ///
1390 /// See the documentation of the authentication API's `refresh_access_token`
1391 /// method for more information.
1392 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1393 let Some(auth_api) = self.auth_api() else {
1394 return Err(RefreshTokenError::RefreshTokenRequired);
1395 };
1396
1397 match auth_api {
1398 AuthApi::Matrix(api) => {
1399 trace!("Token refresh: Using the homeserver.");
1400 Box::pin(api.refresh_access_token()).await?;
1401 }
1402 AuthApi::OAuth(api) => {
1403 trace!("Token refresh: Using OAuth 2.0.");
1404 Box::pin(api.refresh_access_token()).await?;
1405 }
1406 }
1407
1408 Ok(())
1409 }
1410
1411 /// Log out the current session using the proper authentication API.
1412 ///
1413 /// # Errors
1414 ///
1415 /// Returns an error if the session is not authenticated or if an error
1416 /// occurred while making the request to the server.
1417 pub async fn logout(&self) -> Result<(), Error> {
1418 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1419 match auth_api {
1420 AuthApi::Matrix(matrix_auth) => {
1421 matrix_auth.logout().await?;
1422 Ok(())
1423 }
1424 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1425 }
1426 }
1427
1428 /// Get or upload a sync filter.
1429 ///
1430 /// This method will either get a filter ID from the store or upload the
1431 /// filter definition to the homeserver and return the new filter ID.
1432 ///
1433 /// # Arguments
1434 ///
1435 /// * `filter_name` - The unique name of the filter, this name will be used
1436 /// locally to store and identify the filter ID returned by the server.
1437 ///
1438 /// * `definition` - The filter definition that should be uploaded to the
1439 /// server if no filter ID can be found in the store.
1440 ///
1441 /// # Examples
1442 ///
1443 /// ```no_run
1444 /// # use matrix_sdk::{
1445 /// # Client, config::SyncSettings,
1446 /// # ruma::api::client::{
1447 /// # filter::{
1448 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1449 /// # },
1450 /// # sync::sync_events::v3::Filter,
1451 /// # }
1452 /// # };
1453 /// # use url::Url;
1454 /// # async {
1455 /// # let homeserver = Url::parse("http://example.com").unwrap();
1456 /// # let client = Client::new(homeserver).await.unwrap();
1457 /// let mut filter = FilterDefinition::default();
1458 ///
1459 /// // Let's enable member lazy loading.
1460 /// filter.room.state.lazy_load_options =
1461 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1462 ///
1463 /// let filter_id = client
1464 /// .get_or_upload_filter("sync", filter)
1465 /// .await
1466 /// .unwrap();
1467 ///
1468 /// let sync_settings = SyncSettings::new()
1469 /// .filter(Filter::FilterId(filter_id));
1470 ///
1471 /// let response = client.sync_once(sync_settings).await.unwrap();
1472 /// # };
1473 #[instrument(skip(self, definition))]
1474 pub async fn get_or_upload_filter(
1475 &self,
1476 filter_name: &str,
1477 definition: FilterDefinition,
1478 ) -> Result<String> {
1479 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1480 debug!("Found filter locally");
1481 Ok(filter)
1482 } else {
1483 debug!("Didn't find filter locally");
1484 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1485 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1486 let response = self.send(request).await?;
1487
1488 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1489
1490 Ok(response.filter_id)
1491 }
1492 }
1493
1494 /// Prepare to join a room by ID, by getting the current details about it
1495 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1496 let room = self.get_room(room_id)?;
1497
1498 let inviter = match room.invite_details().await {
1499 Ok(details) => details.inviter,
1500 Err(Error::WrongRoomState(_)) => None,
1501 Err(e) => {
1502 warn!("Error fetching invite details for room: {e:?}");
1503 None
1504 }
1505 };
1506
1507 Some(PreJoinRoomInfo { inviter })
1508 }
1509
1510 /// Finish joining a room.
1511 ///
1512 /// If the room was an invite that should be marked as a DM, will include it
1513 /// in the DM event after creating the joined room.
1514 ///
1515 /// If encrypted history sharing is enabled, will check to see if we have a
1516 /// key bundle, and import it if so.
1517 ///
1518 /// # Arguments
1519 ///
1520 /// * `room_id` - The `RoomId` of the room that was joined.
1521 /// * `pre_join_room_info` - Information about the room before we joined.
1522 async fn finish_join_room(
1523 &self,
1524 room_id: &RoomId,
1525 pre_join_room_info: Option<PreJoinRoomInfo>,
1526 ) -> Result<Room> {
1527 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1528 room.state() == RoomState::Invited
1529 && room.is_direct().await.unwrap_or_else(|e| {
1530 warn!(%room_id, "is_direct() failed: {e}");
1531 false
1532 })
1533 } else {
1534 false
1535 };
1536
1537 let base_room = self
1538 .base_client()
1539 .room_joined(
1540 room_id,
1541 pre_join_room_info
1542 .as_ref()
1543 .and_then(|info| info.inviter.as_ref())
1544 .map(|i| i.user_id().to_owned()),
1545 )
1546 .await?;
1547 let room = Room::new(self.clone(), base_room);
1548
1549 if mark_as_dm {
1550 room.set_is_direct(true).await?;
1551 }
1552
1553 #[cfg(feature = "e2e-encryption")]
1554 if self.inner.enable_share_history_on_invite {
1555 if let Some(inviter) =
1556 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1557 {
1558 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1559 .await?;
1560 }
1561 }
1562
1563 // Suppress "unused variable" and "unused field" lints
1564 #[cfg(not(feature = "e2e-encryption"))]
1565 let _ = pre_join_room_info.map(|i| i.inviter);
1566
1567 Ok(room)
1568 }
1569
1570 /// Join a room by `RoomId`.
1571 ///
1572 /// Returns the `Room` in the joined state.
1573 ///
1574 /// # Arguments
1575 ///
1576 /// * `room_id` - The `RoomId` of the room to be joined.
1577 #[instrument(skip(self))]
1578 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1579 // See who invited us to this room, if anyone. Note we have to do this before
1580 // making the `/join` request, otherwise we could race against the sync.
1581 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1582
1583 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1584 let response = self.send(request).await?;
1585 self.finish_join_room(&response.room_id, pre_join_info).await
1586 }
1587
1588 /// Join a room by `RoomOrAliasId`.
1589 ///
1590 /// Returns the `Room` in the joined state.
1591 ///
1592 /// # Arguments
1593 ///
1594 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1595 /// alias looks like `#name:example.com`.
1596 /// * `server_names` - The server names to be used for resolving the alias,
1597 /// if needs be.
1598 pub async fn join_room_by_id_or_alias(
1599 &self,
1600 alias: &RoomOrAliasId,
1601 server_names: &[OwnedServerName],
1602 ) -> Result<Room> {
1603 let pre_join_info = {
1604 match alias.try_into() {
1605 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1606 Err(_) => {
1607 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1608 // responding to an invitation to the room, and therefore don't need to handle
1609 // things that happen as a result of invites.
1610 None
1611 }
1612 }
1613 };
1614 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1615 via: server_names.to_owned(),
1616 });
1617 let response = self.send(request).await?;
1618 self.finish_join_room(&response.room_id, pre_join_info).await
1619 }
1620
1621 /// Search the homeserver's directory of public rooms.
1622 ///
1623 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1624 /// a `get_public_rooms::Response`.
1625 ///
1626 /// # Arguments
1627 ///
1628 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1629 ///
1630 /// * `since` - Pagination token from a previous request.
1631 ///
1632 /// * `server` - The name of the server, if `None` the requested server is
1633 /// used.
1634 ///
1635 /// # Examples
1636 /// ```no_run
1637 /// use matrix_sdk::Client;
1638 /// # use url::Url;
1639 /// # let homeserver = Url::parse("http://example.com").unwrap();
1640 /// # let limit = Some(10);
1641 /// # let since = Some("since token");
1642 /// # let server = Some("servername.com".try_into().unwrap());
1643 /// # async {
1644 /// let mut client = Client::new(homeserver).await.unwrap();
1645 ///
1646 /// client.public_rooms(limit, since, server).await;
1647 /// # };
1648 /// ```
1649 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1650 pub async fn public_rooms(
1651 &self,
1652 limit: Option<u32>,
1653 since: Option<&str>,
1654 server: Option<&ServerName>,
1655 ) -> HttpResult<get_public_rooms::v3::Response> {
1656 let limit = limit.map(UInt::from);
1657
1658 let request = assign!(get_public_rooms::v3::Request::new(), {
1659 limit,
1660 since: since.map(ToOwned::to_owned),
1661 server: server.map(ToOwned::to_owned),
1662 });
1663 self.send(request).await
1664 }
1665
1666 /// Create a room with the given parameters.
1667 ///
1668 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1669 /// created room.
1670 ///
1671 /// If you want to create a direct message with one specific user, you can
1672 /// use [`create_dm`][Self::create_dm], which is more convenient than
1673 /// assembling the [`create_room::v3::Request`] yourself.
1674 ///
1675 /// If the `is_direct` field of the request is set to `true` and at least
1676 /// one user is invited, the room will be automatically added to the direct
1677 /// rooms in the account data.
1678 ///
1679 /// # Examples
1680 ///
1681 /// ```no_run
1682 /// use matrix_sdk::{
1683 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1684 /// Client,
1685 /// };
1686 /// # use url::Url;
1687 /// #
1688 /// # async {
1689 /// # let homeserver = Url::parse("http://example.com").unwrap();
1690 /// let request = CreateRoomRequest::new();
1691 /// let client = Client::new(homeserver).await.unwrap();
1692 /// assert!(client.create_room(request).await.is_ok());
1693 /// # };
1694 /// ```
1695 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1696 let invite = request.invite.clone();
1697 let is_direct_room = request.is_direct;
1698 let response = self.send(request).await?;
1699
1700 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1701
1702 let joined_room = Room::new(self.clone(), base_room);
1703
1704 if is_direct_room && !invite.is_empty() {
1705 if let Err(error) =
1706 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1707 {
1708 // FIXME: Retry in the background
1709 error!("Failed to mark room as DM: {error}");
1710 }
1711 }
1712
1713 Ok(joined_room)
1714 }
1715
1716 /// Create a DM room.
1717 ///
1718 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1719 /// given user being invited, the room marked `is_direct` and both the
1720 /// creator and invitee getting the default maximum power level.
1721 ///
1722 /// If the `e2e-encryption` feature is enabled, the room will also be
1723 /// encrypted.
1724 ///
1725 /// # Arguments
1726 ///
1727 /// * `user_id` - The ID of the user to create a DM for.
1728 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1729 #[cfg(feature = "e2e-encryption")]
1730 let initial_state =
1731 vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1732 .to_raw_any()];
1733
1734 #[cfg(not(feature = "e2e-encryption"))]
1735 let initial_state = vec![];
1736
1737 let request = assign!(create_room::v3::Request::new(), {
1738 invite: vec![user_id.to_owned()],
1739 is_direct: true,
1740 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1741 initial_state,
1742 });
1743
1744 self.create_room(request).await
1745 }
1746
1747 /// Search the homeserver's directory for public rooms with a filter.
1748 ///
1749 /// # Arguments
1750 ///
1751 /// * `room_search` - The easiest way to create this request is using the
1752 /// `get_public_rooms_filtered::Request` itself.
1753 ///
1754 /// # Examples
1755 ///
1756 /// ```no_run
1757 /// # use url::Url;
1758 /// # use matrix_sdk::Client;
1759 /// # async {
1760 /// # let homeserver = Url::parse("http://example.com")?;
1761 /// use matrix_sdk::ruma::{
1762 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1763 /// };
1764 /// # let mut client = Client::new(homeserver).await?;
1765 ///
1766 /// let mut filter = Filter::new();
1767 /// filter.generic_search_term = Some("rust".to_owned());
1768 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1769 /// request.filter = filter;
1770 ///
1771 /// let response = client.public_rooms_filtered(request).await?;
1772 ///
1773 /// for room in response.chunk {
1774 /// println!("Found room {room:?}");
1775 /// }
1776 /// # anyhow::Ok(()) };
1777 /// ```
1778 pub async fn public_rooms_filtered(
1779 &self,
1780 request: get_public_rooms_filtered::v3::Request,
1781 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1782 self.send(request).await
1783 }
1784
1785 /// Send an arbitrary request to the server, without updating client state.
1786 ///
1787 /// **Warning:** Because this method *does not* update the client state, it
1788 /// is important to make sure that you account for this yourself, and
1789 /// use wrapper methods where available. This method should *only* be
1790 /// used if a wrapper method for the endpoint you'd like to use is not
1791 /// available.
1792 ///
1793 /// # Arguments
1794 ///
1795 /// * `request` - A filled out and valid request for the endpoint to be hit
1796 ///
1797 /// * `timeout` - An optional request timeout setting, this overrides the
1798 /// default request setting if one was set.
1799 ///
1800 /// # Examples
1801 ///
1802 /// ```no_run
1803 /// # use matrix_sdk::{Client, config::SyncSettings};
1804 /// # use url::Url;
1805 /// # async {
1806 /// # let homeserver = Url::parse("http://localhost:8080")?;
1807 /// # let mut client = Client::new(homeserver).await?;
1808 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1809 ///
1810 /// // First construct the request you want to make
1811 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1812 /// // for all available Endpoints
1813 /// let user_id = user_id!("@example:localhost").to_owned();
1814 /// let request = profile::get_profile::v3::Request::new(user_id);
1815 ///
1816 /// // Start the request using Client::send()
1817 /// let response = client.send(request).await?;
1818 ///
1819 /// // Check the corresponding Response struct to find out what types are
1820 /// // returned
1821 /// # anyhow::Ok(()) };
1822 /// ```
1823 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1824 where
1825 Request: OutgoingRequest + Clone + Debug,
1826 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1827 {
1828 SendRequest {
1829 client: self.clone(),
1830 request,
1831 config: None,
1832 send_progress: Default::default(),
1833 }
1834 }
1835
1836 pub(crate) async fn send_inner<Request>(
1837 &self,
1838 request: Request,
1839 config: Option<RequestConfig>,
1840 send_progress: SharedObservable<TransmissionProgress>,
1841 ) -> HttpResult<Request::IncomingResponse>
1842 where
1843 Request: OutgoingRequest + Debug,
1844 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1845 {
1846 let homeserver = self.homeserver().to_string();
1847 let access_token = self.access_token();
1848
1849 self.inner
1850 .http_client
1851 .send(
1852 request,
1853 config,
1854 homeserver,
1855 access_token.as_deref(),
1856 &self.supported_versions().await?,
1857 send_progress,
1858 )
1859 .await
1860 }
1861
1862 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1863 _ = self
1864 .inner
1865 .auth_ctx
1866 .session_change_sender
1867 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1868 }
1869
1870 /// Fetches server versions from network; no caching.
1871 pub async fn fetch_server_versions(
1872 &self,
1873 request_config: Option<RequestConfig>,
1874 ) -> HttpResult<get_supported_versions::Response> {
1875 let server_versions = self
1876 .inner
1877 .http_client
1878 .send(
1879 get_supported_versions::Request::new(),
1880 request_config,
1881 self.homeserver().to_string(),
1882 None,
1883 &SupportedVersions {
1884 versions: [MatrixVersion::V1_0].into(),
1885 features: Default::default(),
1886 },
1887 Default::default(),
1888 )
1889 .await?;
1890
1891 Ok(server_versions)
1892 }
1893
1894 /// Fetches client well_known from network; no caching.
1895 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1896 let server_url_string = self
1897 .server()
1898 .unwrap_or(
1899 // Sometimes people configure their well-known directly on the homeserver so use
1900 // this as a fallback when the server name is unknown.
1901 &self.homeserver(),
1902 )
1903 .to_string();
1904
1905 let well_known = self
1906 .inner
1907 .http_client
1908 .send(
1909 discover_homeserver::Request::new(),
1910 Some(RequestConfig::short_retry()),
1911 server_url_string,
1912 None,
1913 &SupportedVersions {
1914 versions: [MatrixVersion::V1_0].into(),
1915 features: Default::default(),
1916 },
1917 Default::default(),
1918 )
1919 .await;
1920
1921 match well_known {
1922 Ok(well_known) => Some(well_known),
1923 Err(http_error) => {
1924 // It is perfectly valid to not have a well-known file.
1925 // Maybe we should check for a specific error code to be sure?
1926 warn!("Failed to fetch client well-known: {http_error}");
1927 None
1928 }
1929 }
1930 }
1931
1932 /// Load server info from storage, or fetch them from network and cache
1933 /// them.
1934 async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1935 match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1936 Ok(Some(stored)) => {
1937 if let Some(server_info) =
1938 stored.into_server_info().and_then(|info| info.maybe_decode())
1939 {
1940 return Ok(server_info);
1941 }
1942 }
1943 Ok(None) => {
1944 // fallthrough: cache is empty
1945 }
1946 Err(err) => {
1947 warn!("error when loading cached server info: {err}");
1948 // fallthrough to network.
1949 }
1950 }
1951
1952 let server_versions = self.fetch_server_versions(None).await?;
1953 let well_known = self.fetch_client_well_known().await;
1954 let server_info = ServerInfo::new(
1955 server_versions.versions.clone(),
1956 server_versions.unstable_features.clone(),
1957 well_known.map(Into::into),
1958 );
1959
1960 // Attempt to cache the result in storage.
1961 {
1962 if let Err(err) = self
1963 .state_store()
1964 .set_kv_data(
1965 StateStoreDataKey::ServerInfo,
1966 StateStoreDataValue::ServerInfo(server_info.clone()),
1967 )
1968 .await
1969 {
1970 warn!("error when caching server info: {err}");
1971 }
1972 }
1973
1974 Ok(server_info)
1975 }
1976
1977 async fn get_or_load_and_cache_server_info<
1978 Value,
1979 MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
1980 >(
1981 &self,
1982 map: MapFunction,
1983 ) -> HttpResult<Value> {
1984 let server_info = &self.inner.caches.server_info;
1985 if let CachedValue::Cached(val) = map(&*server_info.read().await) {
1986 return Ok(val);
1987 }
1988
1989 let mut guarded_server_info = server_info.write().await;
1990 if let CachedValue::Cached(val) = map(&guarded_server_info) {
1991 return Ok(val);
1992 }
1993
1994 let server_info = self.load_or_fetch_server_info().await?;
1995
1996 // Fill both unstable features and server versions at once.
1997 let mut supported = server_info.supported_versions();
1998 if supported.versions.is_empty() {
1999 supported.versions = [MatrixVersion::V1_0].into();
2000 }
2001
2002 guarded_server_info.supported_versions = CachedValue::Cached(supported);
2003 guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2004
2005 // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2006 // fetch an optional property), the function will always return some.
2007 Ok(map(&guarded_server_info).unwrap_cached_value())
2008 }
2009
2010 /// Get the Matrix versions and features supported by the homeserver by
2011 /// fetching them from the server or the cache.
2012 ///
2013 /// This is equivalent to calling both [`Client::server_versions()`] and
2014 /// [`Client::unstable_features()`]. To always fetch the result from the
2015 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2016 /// and then `.as_supported_versions()` on the response.
2017 ///
2018 /// # Examples
2019 ///
2020 /// ```no_run
2021 /// use ruma::api::{FeatureFlag, MatrixVersion};
2022 /// # use matrix_sdk::{Client, config::SyncSettings};
2023 /// # use url::Url;
2024 /// # async {
2025 /// # let homeserver = Url::parse("http://localhost:8080")?;
2026 /// # let mut client = Client::new(homeserver).await?;
2027 ///
2028 /// let supported = client.supported_versions().await?;
2029 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2030 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2031 ///
2032 /// let msc_x_feature = FeatureFlag::from("msc_x");
2033 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2034 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2035 /// # anyhow::Ok(()) };
2036 /// ```
2037 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2038 self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2039 .await
2040 }
2041
2042 /// Get the Matrix versions supported by the homeserver by fetching them
2043 /// from the server or the cache.
2044 ///
2045 /// # Examples
2046 ///
2047 /// ```no_run
2048 /// use ruma::api::MatrixVersion;
2049 /// # use matrix_sdk::{Client, config::SyncSettings};
2050 /// # use url::Url;
2051 /// # async {
2052 /// # let homeserver = Url::parse("http://localhost:8080")?;
2053 /// # let mut client = Client::new(homeserver).await?;
2054 ///
2055 /// let server_versions = client.server_versions().await?;
2056 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2057 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2058 /// # anyhow::Ok(()) };
2059 /// ```
2060 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2061 self.get_or_load_and_cache_server_info(|server_info| {
2062 server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2063 })
2064 .await
2065 }
2066
2067 /// Get the unstable features supported by the homeserver by fetching them
2068 /// from the server or the cache.
2069 ///
2070 /// # Examples
2071 ///
2072 /// ```no_run
2073 /// use matrix_sdk::ruma::api::FeatureFlag;
2074 /// # use matrix_sdk::{Client, config::SyncSettings};
2075 /// # use url::Url;
2076 /// # async {
2077 /// # let homeserver = Url::parse("http://localhost:8080")?;
2078 /// # let mut client = Client::new(homeserver).await?;
2079 ///
2080 /// let msc_x_feature = FeatureFlag::from("msc_x");
2081 /// let unstable_features = client.unstable_features().await?;
2082 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2083 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2084 /// # anyhow::Ok(()) };
2085 /// ```
2086 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2087 self.get_or_load_and_cache_server_info(|server_info| {
2088 server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2089 })
2090 .await
2091 }
2092
2093 /// Get information about the homeserver's advertised RTC foci by fetching
2094 /// the well-known file from the server or the cache.
2095 ///
2096 /// # Examples
2097 /// ```no_run
2098 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2099 /// # use url::Url;
2100 /// # async {
2101 /// # let homeserver = Url::parse("http://localhost:8080")?;
2102 /// # let mut client = Client::new(homeserver).await?;
2103 /// let rtc_foci = client.rtc_foci().await?;
2104 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2105 /// RtcFocusInfo::LiveKit(info) => Some(info),
2106 /// _ => None,
2107 /// });
2108 /// if let Some(info) = default_livekit_focus_info {
2109 /// println!("Default LiveKit service URL: {}", info.service_url);
2110 /// }
2111 /// # anyhow::Ok(()) };
2112 /// ```
2113 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2114 let well_known = self
2115 .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2116 .await?;
2117
2118 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2119 }
2120
2121 /// Empty the server version and unstable features cache.
2122 ///
2123 /// Since the SDK caches server info (versions, unstable features,
2124 /// well-known etc), it's possible to have a stale entry in the cache. This
2125 /// functions makes it possible to force reset it.
2126 pub async fn reset_server_info(&self) -> Result<()> {
2127 // Empty the in-memory caches.
2128 let mut guard = self.inner.caches.server_info.write().await;
2129 guard.supported_versions = CachedValue::NotSet;
2130
2131 // Empty the store cache.
2132 Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2133 }
2134
2135 /// Check whether MSC 4028 is enabled on the homeserver.
2136 ///
2137 /// # Examples
2138 ///
2139 /// ```no_run
2140 /// # use matrix_sdk::{Client, config::SyncSettings};
2141 /// # use url::Url;
2142 /// # async {
2143 /// # let homeserver = Url::parse("http://localhost:8080")?;
2144 /// # let mut client = Client::new(homeserver).await?;
2145 /// let msc4028_enabled =
2146 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2147 /// # anyhow::Ok(()) };
2148 /// ```
2149 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2150 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2151 }
2152
2153 /// Get information of all our own devices.
2154 ///
2155 /// # Examples
2156 ///
2157 /// ```no_run
2158 /// # use matrix_sdk::{Client, config::SyncSettings};
2159 /// # use url::Url;
2160 /// # async {
2161 /// # let homeserver = Url::parse("http://localhost:8080")?;
2162 /// # let mut client = Client::new(homeserver).await?;
2163 /// let response = client.devices().await?;
2164 ///
2165 /// for device in response.devices {
2166 /// println!(
2167 /// "Device: {} {}",
2168 /// device.device_id,
2169 /// device.display_name.as_deref().unwrap_or("")
2170 /// );
2171 /// }
2172 /// # anyhow::Ok(()) };
2173 /// ```
2174 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2175 let request = get_devices::v3::Request::new();
2176
2177 self.send(request).await
2178 }
2179
2180 /// Delete the given devices from the server.
2181 ///
2182 /// # Arguments
2183 ///
2184 /// * `devices` - The list of devices that should be deleted from the
2185 /// server.
2186 ///
2187 /// * `auth_data` - This request requires user interactive auth, the first
2188 /// request needs to set this to `None` and will always fail with an
2189 /// `UiaaResponse`. The response will contain information for the
2190 /// interactive auth and the same request needs to be made but this time
2191 /// with some `auth_data` provided.
2192 ///
2193 /// ```no_run
2194 /// # use matrix_sdk::{
2195 /// # ruma::{api::client::uiaa, device_id},
2196 /// # Client, Error, config::SyncSettings,
2197 /// # };
2198 /// # use serde_json::json;
2199 /// # use url::Url;
2200 /// # use std::collections::BTreeMap;
2201 /// # async {
2202 /// # let homeserver = Url::parse("http://localhost:8080")?;
2203 /// # let mut client = Client::new(homeserver).await?;
2204 /// let devices = &[device_id!("DEVICEID").to_owned()];
2205 ///
2206 /// if let Err(e) = client.delete_devices(devices, None).await {
2207 /// if let Some(info) = e.as_uiaa_response() {
2208 /// let mut password = uiaa::Password::new(
2209 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2210 /// "wordpass".to_owned(),
2211 /// );
2212 /// password.session = info.session.clone();
2213 ///
2214 /// client
2215 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2216 /// .await?;
2217 /// }
2218 /// }
2219 /// # anyhow::Ok(()) };
2220 pub async fn delete_devices(
2221 &self,
2222 devices: &[OwnedDeviceId],
2223 auth_data: Option<uiaa::AuthData>,
2224 ) -> HttpResult<delete_devices::v3::Response> {
2225 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2226 request.auth = auth_data;
2227
2228 self.send(request).await
2229 }
2230
2231 /// Change the display name of a device owned by the current user.
2232 ///
2233 /// Returns a `update_device::Response` which specifies the result
2234 /// of the operation.
2235 ///
2236 /// # Arguments
2237 ///
2238 /// * `device_id` - The ID of the device to change the display name of.
2239 /// * `display_name` - The new display name to set.
2240 pub async fn rename_device(
2241 &self,
2242 device_id: &DeviceId,
2243 display_name: &str,
2244 ) -> HttpResult<update_device::v3::Response> {
2245 let mut request = update_device::v3::Request::new(device_id.to_owned());
2246 request.display_name = Some(display_name.to_owned());
2247
2248 self.send(request).await
2249 }
2250
2251 /// Synchronize the client's state with the latest state on the server.
2252 ///
2253 /// ## Syncing Events
2254 ///
2255 /// Messages or any other type of event need to be periodically fetched from
2256 /// the server, this is achieved by sending a `/sync` request to the server.
2257 ///
2258 /// The first sync is sent out without a [`token`]. The response of the
2259 /// first sync will contain a [`next_batch`] field which should then be
2260 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2261 /// don't receive the same events multiple times.
2262 ///
2263 /// ## Long Polling
2264 ///
2265 /// A sync should in the usual case always be in flight. The
2266 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2267 /// long the server will wait for new events before it will respond.
2268 /// The server will respond immediately if some new events arrive before the
2269 /// timeout has expired. If no changes arrive and the timeout expires an
2270 /// empty sync response will be sent to the client.
2271 ///
2272 /// This method of sending a request that may not receive a response
2273 /// immediately is called long polling.
2274 ///
2275 /// ## Filtering Events
2276 ///
2277 /// The number or type of messages and events that the client should receive
2278 /// from the server can be altered using a [`Filter`].
2279 ///
2280 /// Filters can be non-trivial and, since they will be sent with every sync
2281 /// request, they may take up a bunch of unnecessary bandwidth.
2282 ///
2283 /// Luckily filters can be uploaded to the server and reused using an unique
2284 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2285 /// method.
2286 ///
2287 /// # Arguments
2288 ///
2289 /// * `sync_settings` - Settings for the sync call, this allows us to set
2290 /// various options to configure the sync:
2291 /// * [`filter`] - To configure which events we receive and which get
2292 /// [filtered] by the server
2293 /// * [`timeout`] - To configure our [long polling] setup.
2294 /// * [`token`] - To tell the server which events we already received
2295 /// and where we wish to continue syncing.
2296 /// * [`full_state`] - To tell the server that we wish to receive all
2297 /// state events, regardless of our configured [`token`].
2298 /// * [`set_presence`] - To tell the server to set the presence and to
2299 /// which state.
2300 ///
2301 /// # Examples
2302 ///
2303 /// ```no_run
2304 /// # use url::Url;
2305 /// # async {
2306 /// # let homeserver = Url::parse("http://localhost:8080")?;
2307 /// # let username = "";
2308 /// # let password = "";
2309 /// use matrix_sdk::{
2310 /// config::SyncSettings,
2311 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2312 /// };
2313 ///
2314 /// let client = Client::new(homeserver).await?;
2315 /// client.matrix_auth().login_username(username, password).send().await?;
2316 ///
2317 /// // Sync once so we receive the client state and old messages.
2318 /// client.sync_once(SyncSettings::default()).await?;
2319 ///
2320 /// // Register our handler so we start responding once we receive a new
2321 /// // event.
2322 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2323 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2324 /// });
2325 ///
2326 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2327 /// // from our `sync_once()` call automatically.
2328 /// client.sync(SyncSettings::default()).await;
2329 /// # anyhow::Ok(()) };
2330 /// ```
2331 ///
2332 /// [`sync`]: #method.sync
2333 /// [`SyncSettings`]: crate::config::SyncSettings
2334 /// [`token`]: crate::config::SyncSettings#method.token
2335 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2336 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2337 /// [`set_presence`]: ruma::presence::PresenceState
2338 /// [`filter`]: crate::config::SyncSettings#method.filter
2339 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2340 /// [`next_batch`]: SyncResponse#structfield.next_batch
2341 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2342 /// [long polling]: #long-polling
2343 /// [filtered]: #filtering-events
2344 #[instrument(skip(self))]
2345 pub async fn sync_once(
2346 &self,
2347 sync_settings: crate::config::SyncSettings,
2348 ) -> Result<SyncResponse> {
2349 // The sync might not return for quite a while due to the timeout.
2350 // We'll see if there's anything crypto related to send out before we
2351 // sync, i.e. if we closed our client after a sync but before the
2352 // crypto requests were sent out.
2353 //
2354 // This will mostly be a no-op.
2355 #[cfg(feature = "e2e-encryption")]
2356 if let Err(e) = self.send_outgoing_requests().await {
2357 error!(error = ?e, "Error while sending outgoing E2EE requests");
2358 }
2359
2360 let token = match sync_settings.token {
2361 SyncToken::Specific(token) => Some(token),
2362 SyncToken::NoToken => None,
2363 SyncToken::ReusePrevious => self.sync_token().await,
2364 };
2365
2366 let request = assign!(sync_events::v3::Request::new(), {
2367 filter: sync_settings.filter.map(|f| *f),
2368 since: token,
2369 full_state: sync_settings.full_state,
2370 set_presence: sync_settings.set_presence,
2371 timeout: sync_settings.timeout,
2372 use_state_after: true,
2373 });
2374 let mut request_config = self.request_config();
2375 if let Some(timeout) = sync_settings.timeout {
2376 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2377 request_config.timeout = Some(base_timeout + timeout);
2378 }
2379
2380 let response = self.send(request).with_request_config(request_config).await?;
2381 let next_batch = response.next_batch.clone();
2382 let response = self.process_sync(response).await?;
2383
2384 #[cfg(feature = "e2e-encryption")]
2385 if let Err(e) = self.send_outgoing_requests().await {
2386 error!(error = ?e, "Error while sending outgoing E2EE requests");
2387 }
2388
2389 self.inner.sync_beat.notify(usize::MAX);
2390
2391 Ok(SyncResponse::new(next_batch, response))
2392 }
2393
2394 /// Repeatedly synchronize the client state with the server.
2395 ///
2396 /// This method will only return on error, if cancellation is needed
2397 /// the method should be wrapped in a cancelable task or the
2398 /// [`Client::sync_with_callback`] method can be used or
2399 /// [`Client::sync_with_result_callback`] if you want to handle error
2400 /// cases in the loop, too.
2401 ///
2402 /// This method will internally call [`Client::sync_once`] in a loop.
2403 ///
2404 /// This method can be used with the [`Client::add_event_handler`]
2405 /// method to react to individual events. If you instead wish to handle
2406 /// events in a bulk manner the [`Client::sync_with_callback`],
2407 /// [`Client::sync_with_result_callback`] and
2408 /// [`Client::sync_stream`] methods can be used instead. Those methods
2409 /// repeatedly return the whole sync response.
2410 ///
2411 /// # Arguments
2412 ///
2413 /// * `sync_settings` - Settings for the sync call. *Note* that those
2414 /// settings will be only used for the first sync call. See the argument
2415 /// docs for [`Client::sync_once`] for more info.
2416 ///
2417 /// # Return
2418 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2419 /// up to the user of the API to check the error and decide whether the sync
2420 /// should continue or not.
2421 ///
2422 /// # Examples
2423 ///
2424 /// ```no_run
2425 /// # use url::Url;
2426 /// # async {
2427 /// # let homeserver = Url::parse("http://localhost:8080")?;
2428 /// # let username = "";
2429 /// # let password = "";
2430 /// use matrix_sdk::{
2431 /// config::SyncSettings,
2432 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2433 /// };
2434 ///
2435 /// let client = Client::new(homeserver).await?;
2436 /// client.matrix_auth().login_username(&username, &password).send().await?;
2437 ///
2438 /// // Register our handler so we start responding once we receive a new
2439 /// // event.
2440 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2441 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2442 /// });
2443 ///
2444 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2445 /// // automatically.
2446 /// client.sync(SyncSettings::default()).await?;
2447 /// # anyhow::Ok(()) };
2448 /// ```
2449 ///
2450 /// [argument docs]: #method.sync_once
2451 /// [`sync_with_callback`]: #method.sync_with_callback
2452 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2453 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2454 }
2455
2456 /// Repeatedly call sync to synchronize the client state with the server.
2457 ///
2458 /// # Arguments
2459 ///
2460 /// * `sync_settings` - Settings for the sync call. *Note* that those
2461 /// settings will be only used for the first sync call. See the argument
2462 /// docs for [`Client::sync_once`] for more info.
2463 ///
2464 /// * `callback` - A callback that will be called every time a successful
2465 /// response has been fetched from the server. The callback must return a
2466 /// boolean which signalizes if the method should stop syncing. If the
2467 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2468 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2469 ///
2470 /// # Return
2471 /// The sync runs until an error occurs or the
2472 /// callback indicates that the Loop should stop. If the callback asked for
2473 /// a regular stop, the result will be `Ok(())` otherwise the
2474 /// `Err(Error)` is returned.
2475 ///
2476 /// # Examples
2477 ///
2478 /// The following example demonstrates how to sync forever while sending all
2479 /// the interesting events through a mpsc channel to another thread e.g. a
2480 /// UI thread.
2481 ///
2482 /// ```no_run
2483 /// # use std::time::Duration;
2484 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2485 /// # use url::Url;
2486 /// # async {
2487 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2488 /// # let mut client = Client::new(homeserver).await.unwrap();
2489 ///
2490 /// use tokio::sync::mpsc::channel;
2491 ///
2492 /// let (tx, rx) = channel(100);
2493 ///
2494 /// let sync_channel = &tx;
2495 /// let sync_settings = SyncSettings::new()
2496 /// .timeout(Duration::from_secs(30));
2497 ///
2498 /// client
2499 /// .sync_with_callback(sync_settings, |response| async move {
2500 /// let channel = sync_channel;
2501 /// for (room_id, room) in response.rooms.joined {
2502 /// for event in room.timeline.events {
2503 /// channel.send(event).await.unwrap();
2504 /// }
2505 /// }
2506 ///
2507 /// LoopCtrl::Continue
2508 /// })
2509 /// .await;
2510 /// };
2511 /// ```
2512 #[instrument(skip_all)]
2513 pub async fn sync_with_callback<C>(
2514 &self,
2515 sync_settings: crate::config::SyncSettings,
2516 callback: impl Fn(SyncResponse) -> C,
2517 ) -> Result<(), Error>
2518 where
2519 C: Future<Output = LoopCtrl>,
2520 {
2521 self.sync_with_result_callback(sync_settings, |result| async {
2522 Ok(callback(result?).await)
2523 })
2524 .await
2525 }
2526
2527 /// Repeatedly call sync to synchronize the client state with the server.
2528 ///
2529 /// # Arguments
2530 ///
2531 /// * `sync_settings` - Settings for the sync call. *Note* that those
2532 /// settings will be only used for the first sync call. See the argument
2533 /// docs for [`Client::sync_once`] for more info.
2534 ///
2535 /// * `callback` - A callback that will be called every time after a
2536 /// response has been received, failure or not. The callback returns a
2537 /// `Result<LoopCtrl, Error>`, too. When returning
2538 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2539 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2540 /// function returns `Ok(())`. In case the callback can't handle the
2541 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2542 /// which results in the sync ending and the `Err(Error)` being returned.
2543 ///
2544 /// # Return
2545 /// The sync runs until an error occurs that the callback can't handle or
2546 /// the callback indicates that the Loop should stop. If the callback
2547 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2548 /// `Err(Error)` is returned.
2549 ///
2550 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2551 /// this, and are handled first without sending the result to the
2552 /// callback. Only after they have exceeded is the `Result` handed to
2553 /// the callback.
2554 ///
2555 /// # Examples
2556 ///
2557 /// The following example demonstrates how to sync forever while sending all
2558 /// the interesting events through a mpsc channel to another thread e.g. a
2559 /// UI thread.
2560 ///
2561 /// ```no_run
2562 /// # use std::time::Duration;
2563 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2564 /// # use url::Url;
2565 /// # async {
2566 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2567 /// # let mut client = Client::new(homeserver).await.unwrap();
2568 /// #
2569 /// use tokio::sync::mpsc::channel;
2570 ///
2571 /// let (tx, rx) = channel(100);
2572 ///
2573 /// let sync_channel = &tx;
2574 /// let sync_settings = SyncSettings::new()
2575 /// .timeout(Duration::from_secs(30));
2576 ///
2577 /// client
2578 /// .sync_with_result_callback(sync_settings, |response| async move {
2579 /// let channel = sync_channel;
2580 /// let sync_response = response?;
2581 /// for (room_id, room) in sync_response.rooms.joined {
2582 /// for event in room.timeline.events {
2583 /// channel.send(event).await.unwrap();
2584 /// }
2585 /// }
2586 ///
2587 /// Ok(LoopCtrl::Continue)
2588 /// })
2589 /// .await;
2590 /// };
2591 /// ```
2592 #[instrument(skip(self, callback))]
2593 pub async fn sync_with_result_callback<C>(
2594 &self,
2595 sync_settings: crate::config::SyncSettings,
2596 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2597 ) -> Result<(), Error>
2598 where
2599 C: Future<Output = Result<LoopCtrl, Error>>,
2600 {
2601 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2602
2603 while let Some(result) = sync_stream.next().await {
2604 trace!("Running callback");
2605 if callback(result).await? == LoopCtrl::Break {
2606 trace!("Callback told us to stop");
2607 break;
2608 }
2609 trace!("Done running callback");
2610 }
2611
2612 Ok(())
2613 }
2614
2615 //// Repeatedly synchronize the client state with the server.
2616 ///
2617 /// This method will internally call [`Client::sync_once`] in a loop and is
2618 /// equivalent to the [`Client::sync`] method but the responses are provided
2619 /// as an async stream.
2620 ///
2621 /// # Arguments
2622 ///
2623 /// * `sync_settings` - Settings for the sync call. *Note* that those
2624 /// settings will be only used for the first sync call. See the argument
2625 /// docs for [`Client::sync_once`] for more info.
2626 ///
2627 /// # Examples
2628 ///
2629 /// ```no_run
2630 /// # use url::Url;
2631 /// # async {
2632 /// # let homeserver = Url::parse("http://localhost:8080")?;
2633 /// # let username = "";
2634 /// # let password = "";
2635 /// use futures_util::StreamExt;
2636 /// use matrix_sdk::{config::SyncSettings, Client};
2637 ///
2638 /// let client = Client::new(homeserver).await?;
2639 /// client.matrix_auth().login_username(&username, &password).send().await?;
2640 ///
2641 /// let mut sync_stream =
2642 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2643 ///
2644 /// while let Some(Ok(response)) = sync_stream.next().await {
2645 /// for room in response.rooms.joined.values() {
2646 /// for e in &room.timeline.events {
2647 /// if let Ok(event) = e.raw().deserialize() {
2648 /// println!("Received event {:?}", event);
2649 /// }
2650 /// }
2651 /// }
2652 /// }
2653 ///
2654 /// # anyhow::Ok(()) };
2655 /// ```
2656 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2657 #[instrument(skip(self))]
2658 pub async fn sync_stream(
2659 &self,
2660 mut sync_settings: crate::config::SyncSettings,
2661 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2662 let mut is_first_sync = true;
2663 let mut timeout = None;
2664 let mut last_sync_time: Option<Instant> = None;
2665
2666 let parent_span = Span::current();
2667
2668 async_stream::stream!({
2669 loop {
2670 trace!("Syncing");
2671
2672 if sync_settings.ignore_timeout_on_first_sync {
2673 if is_first_sync {
2674 timeout = sync_settings.timeout.take();
2675 } else if sync_settings.timeout.is_none() && timeout.is_some() {
2676 sync_settings.timeout = timeout.take();
2677 }
2678
2679 is_first_sync = false;
2680 }
2681
2682 yield self
2683 .sync_loop_helper(&mut sync_settings)
2684 .instrument(parent_span.clone())
2685 .await;
2686
2687 Client::delay_sync(&mut last_sync_time).await
2688 }
2689 })
2690 }
2691
2692 /// Get the current, if any, sync token of the client.
2693 /// This will be None if the client didn't sync at least once.
2694 pub(crate) async fn sync_token(&self) -> Option<String> {
2695 self.inner.base_client.sync_token().await
2696 }
2697
2698 /// Gets information about the owner of a given access token.
2699 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2700 let request = whoami::v3::Request::new();
2701 self.send(request).await
2702 }
2703
2704 /// Subscribes a new receiver to client SessionChange broadcasts.
2705 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2706 let broadcast = &self.auth_ctx().session_change_sender;
2707 broadcast.subscribe()
2708 }
2709
2710 /// Sets the save/restore session callbacks.
2711 ///
2712 /// This is another mechanism to get synchronous updates to session tokens,
2713 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2714 pub fn set_session_callbacks(
2715 &self,
2716 reload_session_callback: Box<ReloadSessionCallback>,
2717 save_session_callback: Box<SaveSessionCallback>,
2718 ) -> Result<()> {
2719 self.inner
2720 .auth_ctx
2721 .reload_session_callback
2722 .set(reload_session_callback)
2723 .map_err(|_| Error::MultipleSessionCallbacks)?;
2724
2725 self.inner
2726 .auth_ctx
2727 .save_session_callback
2728 .set(save_session_callback)
2729 .map_err(|_| Error::MultipleSessionCallbacks)?;
2730
2731 Ok(())
2732 }
2733
2734 /// Get the notification settings of the current owner of the client.
2735 pub async fn notification_settings(&self) -> NotificationSettings {
2736 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2737 NotificationSettings::new(self.clone(), ruleset)
2738 }
2739
2740 /// Create a new specialized `Client` that can process notifications.
2741 ///
2742 /// See [`CrossProcessStoreLock::new`] to learn more about
2743 /// `cross_process_store_locks_holder_name`.
2744 ///
2745 /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2746 pub async fn notification_client(
2747 &self,
2748 cross_process_store_locks_holder_name: String,
2749 ) -> Result<Client> {
2750 let client = Client {
2751 inner: ClientInner::new(
2752 self.inner.auth_ctx.clone(),
2753 self.server().cloned(),
2754 self.homeserver(),
2755 self.sliding_sync_version(),
2756 self.inner.http_client.clone(),
2757 self.inner
2758 .base_client
2759 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2760 .await?,
2761 self.inner.caches.server_info.read().await.clone(),
2762 self.inner.respect_login_well_known,
2763 self.inner.event_cache.clone(),
2764 self.inner.send_queue_data.clone(),
2765 self.inner.latest_events.clone(),
2766 #[cfg(feature = "e2e-encryption")]
2767 self.inner.e2ee.encryption_settings,
2768 #[cfg(feature = "e2e-encryption")]
2769 self.inner.enable_share_history_on_invite,
2770 cross_process_store_locks_holder_name,
2771 #[cfg(feature = "experimental-search")]
2772 self.inner.search_index.clone(),
2773 )
2774 .await,
2775 };
2776
2777 Ok(client)
2778 }
2779
2780 /// The [`EventCache`] instance for this [`Client`].
2781 pub fn event_cache(&self) -> &EventCache {
2782 // SAFETY: always initialized in the `Client` ctor.
2783 self.inner.event_cache.get().unwrap()
2784 }
2785
2786 /// The [`LatestEvents`] instance for this [`Client`].
2787 pub async fn latest_events(&self) -> &LatestEvents {
2788 self.inner
2789 .latest_events
2790 .get_or_init(|| async {
2791 LatestEvents::new(
2792 WeakClient::from_client(self),
2793 self.event_cache().clone(),
2794 SendQueue::new(self.clone()),
2795 )
2796 })
2797 .await
2798 }
2799
2800 /// Waits until an at least partially synced room is received, and returns
2801 /// it.
2802 ///
2803 /// **Note: this function will loop endlessly until either it finds the room
2804 /// or an externally set timeout happens.**
2805 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2806 loop {
2807 if let Some(room) = self.get_room(room_id) {
2808 if room.is_state_partially_or_fully_synced() {
2809 debug!("Found just created room!");
2810 return room;
2811 }
2812 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2813 } else {
2814 debug!("Room wasn't found, waiting for sync beat to try again");
2815 }
2816 self.inner.sync_beat.listen().await;
2817 }
2818 }
2819
2820 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2821 /// join it.
2822 pub async fn knock(
2823 &self,
2824 room_id_or_alias: OwnedRoomOrAliasId,
2825 reason: Option<String>,
2826 server_names: Vec<OwnedServerName>,
2827 ) -> Result<Room> {
2828 let request =
2829 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2830 let response = self.send(request).await?;
2831 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2832 Ok(Room::new(self.clone(), base_room))
2833 }
2834
2835 /// Checks whether the provided `user_id` belongs to an ignored user.
2836 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2837 self.base_client().is_user_ignored(user_id).await
2838 }
2839
2840 /// Gets the `max_upload_size` value from the homeserver, getting either a
2841 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2842 /// missing.
2843 ///
2844 /// Check the spec for more info:
2845 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2846 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2847 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2848 if let Some(data) = max_upload_size_lock.get() {
2849 return Ok(data.to_owned());
2850 }
2851
2852 // Use the authenticated endpoint when the server supports it.
2853 let supported_versions = self.supported_versions().await?;
2854 let use_auth =
2855 authenticated_media::get_media_config::v1::Request::is_supported(&supported_versions);
2856
2857 let upload_size = if use_auth {
2858 self.send(authenticated_media::get_media_config::v1::Request::default())
2859 .await?
2860 .upload_size
2861 } else {
2862 #[allow(deprecated)]
2863 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
2864 };
2865
2866 match max_upload_size_lock.set(upload_size) {
2867 Ok(_) => Ok(upload_size),
2868 Err(error) => {
2869 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2870 }
2871 }
2872 }
2873
2874 /// The settings to use for decrypting events.
2875 #[cfg(feature = "e2e-encryption")]
2876 pub fn decryption_settings(&self) -> &DecryptionSettings {
2877 &self.base_client().decryption_settings
2878 }
2879
2880 #[cfg(feature = "experimental-search")]
2881 pub(crate) fn search_index(&self) -> &SearchIndex {
2882 &self.inner.search_index
2883 }
2884
2885 /// Whether the client is configured to take thread subscriptions (MSC4306
2886 /// and MSC4308) into account.
2887 ///
2888 /// This may cause filtering out of thread subscriptions, and loading the
2889 /// thread subscriptions via the sliding sync extension, when the room
2890 /// list service is being used.
2891 pub fn enabled_thread_subscriptions(&self) -> bool {
2892 match self.base_client().threading_support {
2893 ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
2894 ThreadingSupport::Disabled => false,
2895 }
2896 }
2897
2898 /// Fetch thread subscriptions changes between `from` and up to `to`.
2899 ///
2900 /// The `limit` optional parameter can be used to limit the number of
2901 /// entries in a response. It can also be overridden by the server, if
2902 /// it's deemed too large.
2903 pub async fn fetch_thread_subscriptions(
2904 &self,
2905 from: Option<String>,
2906 to: Option<String>,
2907 limit: Option<UInt>,
2908 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
2909 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
2910 from,
2911 to,
2912 limit,
2913 });
2914 Ok(self.send(request).await?)
2915 }
2916}
2917
2918#[cfg(any(feature = "testing", test))]
2919impl Client {
2920 /// Test helper to mark users as tracked by the crypto layer.
2921 #[cfg(feature = "e2e-encryption")]
2922 pub async fn update_tracked_users_for_testing(
2923 &self,
2924 user_ids: impl IntoIterator<Item = &UserId>,
2925 ) {
2926 let olm = self.olm_machine().await;
2927 let olm = olm.as_ref().unwrap();
2928 olm.update_tracked_users(user_ids).await.unwrap();
2929 }
2930}
2931
2932/// A weak reference to the inner client, useful when trying to get a handle
2933/// on the owning client.
2934#[derive(Clone, Debug)]
2935pub(crate) struct WeakClient {
2936 client: Weak<ClientInner>,
2937}
2938
2939impl WeakClient {
2940 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2941 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2942 Self { client: Arc::downgrade(client) }
2943 }
2944
2945 /// Construct a [`WeakClient`] from a [`Client`].
2946 pub fn from_client(client: &Client) -> Self {
2947 Self::from_inner(&client.inner)
2948 }
2949
2950 /// Attempts to get a [`Client`] from this [`WeakClient`].
2951 pub fn get(&self) -> Option<Client> {
2952 self.client.upgrade().map(|inner| Client { inner })
2953 }
2954
2955 /// Gets the number of strong (`Arc`) pointers still pointing to this
2956 /// client.
2957 #[allow(dead_code)]
2958 pub fn strong_count(&self) -> usize {
2959 self.client.strong_count()
2960 }
2961}
2962
2963#[derive(Clone)]
2964struct ClientServerInfo {
2965 /// The Matrix versions and unstable features the server supports (known
2966 /// ones only).
2967 supported_versions: CachedValue<SupportedVersions>,
2968
2969 /// The server's well-known file, if any.
2970 well_known: CachedValue<Option<WellKnownResponse>>,
2971}
2972
2973/// A cached value that can either be set or not set, used to avoid confusion
2974/// between a value that is set to `None` (because it doesn't exist) and a value
2975/// that has not been cached yet.
2976#[derive(Clone)]
2977enum CachedValue<Value> {
2978 /// A value has been cached.
2979 Cached(Value),
2980 /// Nothing has been cached yet.
2981 NotSet,
2982}
2983
2984impl<Value> CachedValue<Value> {
2985 /// Unwraps the cached value, returning it if it exists.
2986 ///
2987 /// # Panics
2988 ///
2989 /// If the cached value is not set, this will panic.
2990 fn unwrap_cached_value(self) -> Value {
2991 match self {
2992 CachedValue::Cached(value) => value,
2993 CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
2994 }
2995 }
2996
2997 /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
2998 fn as_ref(&self) -> CachedValue<&Value> {
2999 match self {
3000 Self::Cached(value) => CachedValue::Cached(value),
3001 Self::NotSet => CachedValue::NotSet,
3002 }
3003 }
3004
3005 /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3006 /// function to a contained value (if `Cached`) or returns `NotSet` (if
3007 /// `NotSet`).
3008 fn map<Other, F>(self, f: F) -> CachedValue<Other>
3009 where
3010 F: FnOnce(Value) -> Other,
3011 {
3012 match self {
3013 Self::Cached(value) => CachedValue::Cached(f(value)),
3014 Self::NotSet => CachedValue::NotSet,
3015 }
3016 }
3017}
3018
3019/// Information about the state of a room before we joined it.
3020#[derive(Debug, Clone, Default)]
3021struct PreJoinRoomInfo {
3022 /// The user who invited us to the room, if any.
3023 pub inviter: Option<RoomMember>,
3024}
3025
3026// The http mocking library is not supported for wasm32
3027#[cfg(all(test, not(target_family = "wasm")))]
3028pub(crate) mod tests {
3029 use std::{sync::Arc, time::Duration};
3030
3031 use assert_matches::assert_matches;
3032 use assert_matches2::assert_let;
3033 use eyeball::SharedObservable;
3034 use futures_util::{pin_mut, FutureExt, StreamExt};
3035 use js_int::{uint, UInt};
3036 use matrix_sdk_base::{
3037 store::{MemoryStore, StoreConfig},
3038 RoomState,
3039 };
3040 use matrix_sdk_test::{
3041 async_test, event_factory::EventFactory, JoinedRoomBuilder, StateTestEvent,
3042 SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
3043 };
3044 #[cfg(target_family = "wasm")]
3045 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3046
3047 use ruma::{
3048 api::{
3049 client::{
3050 discovery::discover_homeserver::RtcFocusInfo,
3051 room::create_room::v3::Request as CreateRoomRequest,
3052 },
3053 FeatureFlag, MatrixVersion,
3054 },
3055 assign,
3056 events::{
3057 ignored_user_list::IgnoredUserListEventContent,
3058 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3059 },
3060 owned_room_id, owned_user_id, room_alias_id, room_id, RoomId, ServerName, UserId,
3061 };
3062 use serde_json::json;
3063 use stream_assert::{assert_next_matches, assert_pending};
3064 use tokio::{
3065 spawn,
3066 time::{sleep, timeout},
3067 };
3068 use url::Url;
3069
3070 use super::Client;
3071 use crate::{
3072 client::{futures::SendMediaUploadRequest, WeakClient},
3073 config::{RequestConfig, SyncSettings},
3074 futures::SendRequest,
3075 media::MediaError,
3076 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3077 Error, TransmissionProgress,
3078 };
3079
3080 #[async_test]
3081 async fn test_account_data() {
3082 let server = MatrixMockServer::new().await;
3083 let client = server.client_builder().build().await;
3084
3085 let f = EventFactory::new();
3086 server
3087 .mock_sync()
3088 .ok_and_run(&client, |builder| {
3089 builder.add_global_account_data(
3090 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3091 );
3092 })
3093 .await;
3094
3095 let content = client
3096 .account()
3097 .account_data::<IgnoredUserListEventContent>()
3098 .await
3099 .unwrap()
3100 .unwrap()
3101 .deserialize()
3102 .unwrap();
3103
3104 assert_eq!(content.ignored_users.len(), 1);
3105 }
3106
3107 #[async_test]
3108 async fn test_successful_discovery() {
3109 // Imagine this is `matrix.org`.
3110 let server = MatrixMockServer::new().await;
3111 let server_url = server.uri();
3112
3113 // Imagine this is `matrix-client.matrix.org`.
3114 let homeserver = MatrixMockServer::new().await;
3115 let homeserver_url = homeserver.uri();
3116
3117 // Imagine Alice has the user ID `@alice:matrix.org`.
3118 let domain = server_url.strip_prefix("http://").unwrap();
3119 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3120
3121 // The `.well-known` is on the server (e.g. `matrix.org`).
3122 server
3123 .mock_well_known()
3124 .ok_with_homeserver_url(&homeserver_url)
3125 .mock_once()
3126 .named("well-known")
3127 .mount()
3128 .await;
3129
3130 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3131 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3132
3133 let client = Client::builder()
3134 .insecure_server_name_no_tls(alice.server_name())
3135 .build()
3136 .await
3137 .unwrap();
3138
3139 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3140 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3141 client.server_versions().await.unwrap();
3142 }
3143
3144 #[async_test]
3145 async fn test_discovery_broken_server() {
3146 let server = MatrixMockServer::new().await;
3147 let server_url = server.uri();
3148 let domain = server_url.strip_prefix("http://").unwrap();
3149 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3150
3151 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3152
3153 assert!(
3154 Client::builder()
3155 .insecure_server_name_no_tls(alice.server_name())
3156 .build()
3157 .await
3158 .is_err(),
3159 "Creating a client from a user ID should fail when the .well-known request fails."
3160 );
3161 }
3162
3163 #[async_test]
3164 async fn test_room_creation() {
3165 let server = MatrixMockServer::new().await;
3166 let client = server.client_builder().build().await;
3167
3168 server
3169 .mock_sync()
3170 .ok_and_run(&client, |builder| {
3171 builder.add_joined_room(
3172 JoinedRoomBuilder::default()
3173 .add_state_event(StateTestEvent::Member)
3174 .add_state_event(StateTestEvent::PowerLevels),
3175 );
3176 })
3177 .await;
3178
3179 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3180 assert_eq!(room.state(), RoomState::Joined);
3181 }
3182
3183 #[async_test]
3184 async fn test_retry_limit_http_requests() {
3185 let server = MatrixMockServer::new().await;
3186 let client = server
3187 .client_builder()
3188 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3189 .build()
3190 .await;
3191
3192 assert!(client.request_config().retry_limit.unwrap() == 3);
3193
3194 server.mock_login().error500().expect(3).mount().await;
3195
3196 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3197 }
3198
3199 #[async_test]
3200 async fn test_retry_timeout_http_requests() {
3201 // Keep this timeout small so that the test doesn't take long
3202 let retry_timeout = Duration::from_secs(5);
3203 let server = MatrixMockServer::new().await;
3204 let client = server
3205 .client_builder()
3206 .on_builder(|builder| {
3207 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3208 })
3209 .build()
3210 .await;
3211
3212 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3213
3214 server.mock_login().error500().expect(2..).mount().await;
3215
3216 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3217 }
3218
3219 #[async_test]
3220 async fn test_short_retry_initial_http_requests() {
3221 let server = MatrixMockServer::new().await;
3222 let client = server
3223 .client_builder()
3224 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3225 .build()
3226 .await;
3227
3228 server.mock_login().error500().expect(3..).mount().await;
3229
3230 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3231 }
3232
3233 #[async_test]
3234 async fn test_no_retry_http_requests() {
3235 let server = MatrixMockServer::new().await;
3236 let client = server.client_builder().build().await;
3237
3238 server.mock_devices().error500().mock_once().mount().await;
3239
3240 client.devices().await.unwrap_err();
3241 }
3242
3243 #[async_test]
3244 async fn test_set_homeserver() {
3245 let client = MockClientBuilder::new(None).build().await;
3246 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3247
3248 let homeserver = Url::parse("http://example.com/").unwrap();
3249 client.set_homeserver(homeserver.clone());
3250 assert_eq!(client.homeserver(), homeserver);
3251 }
3252
3253 #[async_test]
3254 async fn test_search_user_request() {
3255 let server = MatrixMockServer::new().await;
3256 let client = server.client_builder().build().await;
3257
3258 server.mock_user_directory().ok().mock_once().mount().await;
3259
3260 let response = client.search_users("test", 50).await.unwrap();
3261 assert_eq!(response.results.len(), 1);
3262 let result = &response.results[0];
3263 assert_eq!(result.user_id.to_string(), "@test:example.me");
3264 assert_eq!(result.display_name.clone().unwrap(), "Test");
3265 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3266 assert!(!response.limited);
3267 }
3268
3269 #[async_test]
3270 async fn test_request_unstable_features() {
3271 let server = MatrixMockServer::new().await;
3272 let client = server.client_builder().no_server_versions().build().await;
3273
3274 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3275
3276 let unstable_features = client.unstable_features().await.unwrap();
3277 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3278 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3279 }
3280
3281 #[async_test]
3282 async fn test_can_homeserver_push_encrypted_event_to_device() {
3283 let server = MatrixMockServer::new().await;
3284 let client = server.client_builder().no_server_versions().build().await;
3285
3286 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3287
3288 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3289 assert!(msc4028_enabled);
3290 }
3291
3292 #[async_test]
3293 async fn test_recently_visited_rooms() {
3294 // Tracking recently visited rooms requires authentication
3295 let client = MockClientBuilder::new(None).unlogged().build().await;
3296 assert_matches!(
3297 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3298 Err(Error::AuthenticationRequired)
3299 );
3300
3301 let client = MockClientBuilder::new(None).build().await;
3302 let account = client.account();
3303
3304 // We should start off with an empty list
3305 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3306
3307 // Tracking a valid room id should add it to the list
3308 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3309 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3310 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3311
3312 // And the existing list shouldn't be changed
3313 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3314 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3315
3316 // Tracking the same room again shouldn't change the list
3317 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3318 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3319 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3320
3321 // Tracking a second room should add it to the front of the list
3322 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3323 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3324 assert_eq!(
3325 account.get_recently_visited_rooms().await.unwrap(),
3326 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3327 );
3328
3329 // Tracking the first room yet again should move it to the front of the list
3330 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3331 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3332 assert_eq!(
3333 account.get_recently_visited_rooms().await.unwrap(),
3334 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3335 );
3336
3337 // Tracking should be capped at 20
3338 for n in 0..20 {
3339 account
3340 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3341 .await
3342 .unwrap();
3343 }
3344
3345 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3346
3347 // And the initial rooms should've been pushed out
3348 let rooms = account.get_recently_visited_rooms().await.unwrap();
3349 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3350 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3351
3352 // And the last tracked room should be the first
3353 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3354 }
3355
3356 #[async_test]
3357 async fn test_client_no_cycle_with_event_cache() {
3358 let client = MockClientBuilder::new(None).build().await;
3359
3360 // Wait for the init tasks to die.
3361 sleep(Duration::from_secs(1)).await;
3362
3363 let weak_client = WeakClient::from_client(&client);
3364 assert_eq!(weak_client.strong_count(), 1);
3365
3366 {
3367 let room_id = room_id!("!room:example.org");
3368
3369 // Have the client know the room.
3370 let response = SyncResponseBuilder::default()
3371 .add_joined_room(JoinedRoomBuilder::new(room_id))
3372 .build_sync_response();
3373 client.inner.base_client.receive_sync_response(response).await.unwrap();
3374
3375 client.event_cache().subscribe().unwrap();
3376
3377 let (_room_event_cache, _drop_handles) =
3378 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3379 }
3380
3381 drop(client);
3382
3383 // Give a bit of time for background tasks to die.
3384 sleep(Duration::from_secs(1)).await;
3385
3386 // The weak client must be the last reference to the client now.
3387 assert_eq!(weak_client.strong_count(), 0);
3388 let client = weak_client.get();
3389 assert!(
3390 client.is_none(),
3391 "too many strong references to the client: {}",
3392 Arc::strong_count(&client.unwrap().inner)
3393 );
3394 }
3395
3396 #[async_test]
3397 async fn test_server_info_caching() {
3398 let server = MatrixMockServer::new().await;
3399 let server_url = server.uri();
3400 let domain = server_url.strip_prefix("http://").unwrap();
3401 let server_name = <&ServerName>::try_from(domain).unwrap();
3402 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3403
3404 let well_known_mock = server
3405 .mock_well_known()
3406 .ok()
3407 .named("well known mock")
3408 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3409 .mount_as_scoped()
3410 .await;
3411
3412 let versions_mock = server
3413 .mock_versions()
3414 .ok_with_unstable_features()
3415 .named("first versions mock")
3416 .expect(1)
3417 .mount_as_scoped()
3418 .await;
3419
3420 let memory_store = Arc::new(MemoryStore::new());
3421 let client = Client::builder()
3422 .insecure_server_name_no_tls(server_name)
3423 .store_config(
3424 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3425 .state_store(memory_store.clone()),
3426 )
3427 .build()
3428 .await
3429 .unwrap();
3430
3431 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3432
3433 // These subsequent calls hit the in-memory cache.
3434 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3435 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3436
3437 drop(client);
3438
3439 let client = server
3440 .client_builder()
3441 .no_server_versions()
3442 .on_builder(|builder| {
3443 builder.store_config(
3444 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3445 .state_store(memory_store.clone()),
3446 )
3447 })
3448 .build()
3449 .await;
3450
3451 // These calls to the new client hit the on-disk cache.
3452 assert!(client
3453 .unstable_features()
3454 .await
3455 .unwrap()
3456 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3457 let supported = client.supported_versions().await.unwrap();
3458 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3459 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3460
3461 // Then this call hits the in-memory cache.
3462 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3463
3464 drop(versions_mock);
3465 drop(well_known_mock);
3466
3467 // Now, reset the cache, and observe the endpoints being called again once.
3468 client.reset_server_info().await.unwrap();
3469
3470 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3471
3472 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3473
3474 // Hits network again.
3475 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3476 // Hits in-memory cache again.
3477 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3478 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3479 }
3480
3481 #[async_test]
3482 async fn test_server_info_without_a_well_known() {
3483 let server = MatrixMockServer::new().await;
3484 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3485
3486 let versions_mock = server
3487 .mock_versions()
3488 .ok_with_unstable_features()
3489 .named("first versions mock")
3490 .expect(1)
3491 .mount_as_scoped()
3492 .await;
3493
3494 let memory_store = Arc::new(MemoryStore::new());
3495 let client = server
3496 .client_builder()
3497 .no_server_versions()
3498 .on_builder(|builder| {
3499 builder.store_config(
3500 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3501 .state_store(memory_store.clone()),
3502 )
3503 })
3504 .build()
3505 .await;
3506
3507 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3508
3509 // These subsequent calls hit the in-memory cache.
3510 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3511 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3512
3513 drop(client);
3514
3515 let client = server
3516 .client_builder()
3517 .no_server_versions()
3518 .on_builder(|builder| {
3519 builder.store_config(
3520 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3521 .state_store(memory_store.clone()),
3522 )
3523 })
3524 .build()
3525 .await;
3526
3527 // This call to the new client hits the on-disk cache.
3528 assert!(client
3529 .unstable_features()
3530 .await
3531 .unwrap()
3532 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3533
3534 // Then this call hits the in-memory cache.
3535 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3536
3537 drop(versions_mock);
3538
3539 // Now, reset the cache, and observe the endpoints being called again once.
3540 client.reset_server_info().await.unwrap();
3541
3542 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3543
3544 // Hits network again.
3545 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3546 // Hits in-memory cache again.
3547 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3548 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3549 }
3550
3551 #[async_test]
3552 async fn test_no_network_doesnt_cause_infinite_retries() {
3553 // We want infinite retries for transient errors.
3554 let client = MockClientBuilder::new(None)
3555 .on_builder(|builder| builder.request_config(RequestConfig::new()))
3556 .build()
3557 .await;
3558
3559 // We don't define a mock server on purpose here, so that the error is really a
3560 // network error.
3561 client.whoami().await.unwrap_err();
3562 }
3563
3564 #[async_test]
3565 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3566 let server = MatrixMockServer::new().await;
3567 let client = server.client_builder().build().await;
3568
3569 let room_id = room_id!("!room:example.org");
3570
3571 server
3572 .mock_sync()
3573 .ok_and_run(&client, |builder| {
3574 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3575 })
3576 .await;
3577
3578 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3579 assert_eq!(room.room_id(), room_id);
3580 }
3581
3582 #[async_test]
3583 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3584 let server = MatrixMockServer::new().await;
3585 let client = server.client_builder().build().await;
3586
3587 let room_id = room_id!("!room:example.org");
3588
3589 let client = Arc::new(client);
3590
3591 // Perform the /sync request with a delay so it starts after the
3592 // `await_room_remote_echo` call has happened
3593 spawn({
3594 let client = client.clone();
3595 async move {
3596 sleep(Duration::from_millis(100)).await;
3597
3598 server
3599 .mock_sync()
3600 .ok_and_run(&client, |builder| {
3601 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3602 })
3603 .await;
3604 }
3605 });
3606
3607 let room =
3608 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3609 assert_eq!(room.room_id(), room_id);
3610 }
3611
3612 #[async_test]
3613 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3614 let client = MockClientBuilder::new(None).build().await;
3615
3616 let room_id = room_id!("!room:example.org");
3617 // Room is not present so the client won't be able to find it. The call will
3618 // timeout.
3619 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3620 }
3621
3622 #[async_test]
3623 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3624 let server = MatrixMockServer::new().await;
3625 let client = server.client_builder().build().await;
3626
3627 server.mock_create_room().ok().mount().await;
3628
3629 // Create a room in the internal store
3630 let room = client
3631 .create_room(assign!(CreateRoomRequest::new(), {
3632 invite: vec![],
3633 is_direct: false,
3634 }))
3635 .await
3636 .unwrap();
3637
3638 // Room is locally present, but not synced, the call will timeout
3639 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3640 .await
3641 .unwrap_err();
3642 }
3643
3644 #[async_test]
3645 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3646 let server = MatrixMockServer::new().await;
3647 let client = server.client_builder().build().await;
3648
3649 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3650
3651 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3652 assert_matches!(ret, Ok(true));
3653 }
3654
3655 #[async_test]
3656 async fn test_is_room_alias_available_if_alias_is_resolved() {
3657 let server = MatrixMockServer::new().await;
3658 let client = server.client_builder().build().await;
3659
3660 server
3661 .mock_room_directory_resolve_alias()
3662 .ok("!some_room_id:matrix.org", Vec::new())
3663 .expect(1)
3664 .mount()
3665 .await;
3666
3667 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3668 assert_matches!(ret, Ok(false));
3669 }
3670
3671 #[async_test]
3672 async fn test_is_room_alias_available_if_error_found() {
3673 let server = MatrixMockServer::new().await;
3674 let client = server.client_builder().build().await;
3675
3676 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3677
3678 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3679 assert_matches!(ret, Err(_));
3680 }
3681
3682 #[async_test]
3683 async fn test_create_room_alias() {
3684 let server = MatrixMockServer::new().await;
3685 let client = server.client_builder().build().await;
3686
3687 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3688
3689 let ret = client
3690 .create_room_alias(
3691 room_alias_id!("#some_alias:matrix.org"),
3692 room_id!("!some_room:matrix.org"),
3693 )
3694 .await;
3695 assert_matches!(ret, Ok(()));
3696 }
3697
3698 #[async_test]
3699 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3700 let server = MatrixMockServer::new().await;
3701 let client = server.client_builder().build().await;
3702
3703 let room_id = room_id!("!a-room:matrix.org");
3704
3705 // Make sure the summary endpoint is called once
3706 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3707
3708 // We create a locally cached invited room
3709 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3710
3711 // And we get a preview, the server endpoint was reached
3712 let preview = client
3713 .get_room_preview(room_id.into(), Vec::new())
3714 .await
3715 .expect("Room preview should be retrieved");
3716
3717 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3718 }
3719
3720 #[async_test]
3721 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3722 let server = MatrixMockServer::new().await;
3723 let client = server.client_builder().build().await;
3724
3725 let room_id = room_id!("!a-room:matrix.org");
3726
3727 // Make sure the summary endpoint is called once
3728 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3729
3730 // We create a locally cached left room
3731 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3732
3733 // And we get a preview, the server endpoint was reached
3734 let preview = client
3735 .get_room_preview(room_id.into(), Vec::new())
3736 .await
3737 .expect("Room preview should be retrieved");
3738
3739 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3740 }
3741
3742 #[async_test]
3743 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3744 let server = MatrixMockServer::new().await;
3745 let client = server.client_builder().build().await;
3746
3747 let room_id = room_id!("!a-room:matrix.org");
3748
3749 // Make sure the summary endpoint is called once
3750 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3751
3752 // We create a locally cached knocked room
3753 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3754
3755 // And we get a preview, the server endpoint was reached
3756 let preview = client
3757 .get_room_preview(room_id.into(), Vec::new())
3758 .await
3759 .expect("Room preview should be retrieved");
3760
3761 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3762 }
3763
3764 #[async_test]
3765 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3766 let server = MatrixMockServer::new().await;
3767 let client = server.client_builder().build().await;
3768
3769 let room_id = room_id!("!a-room:matrix.org");
3770
3771 // Make sure the summary endpoint is not called
3772 server.mock_room_summary().ok(room_id).never().mount().await;
3773
3774 // We create a locally cached joined room
3775 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3776
3777 // And we get a preview, no server endpoint was reached
3778 let preview = client
3779 .get_room_preview(room_id.into(), Vec::new())
3780 .await
3781 .expect("Room preview should be retrieved");
3782
3783 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3784 }
3785
3786 #[async_test]
3787 async fn test_media_preview_config() {
3788 let server = MatrixMockServer::new().await;
3789 let client = server.client_builder().build().await;
3790
3791 server
3792 .mock_sync()
3793 .ok_and_run(&client, |builder| {
3794 builder.add_custom_global_account_data(json!({
3795 "content": {
3796 "media_previews": "private",
3797 "invite_avatars": "off"
3798 },
3799 "type": "m.media_preview_config"
3800 }));
3801 })
3802 .await;
3803
3804 let (initial_value, stream) =
3805 client.account().observe_media_preview_config().await.unwrap();
3806
3807 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3808 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3809 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3810 pin_mut!(stream);
3811 assert_pending!(stream);
3812
3813 server
3814 .mock_sync()
3815 .ok_and_run(&client, |builder| {
3816 builder.add_custom_global_account_data(json!({
3817 "content": {
3818 "media_previews": "off",
3819 "invite_avatars": "on"
3820 },
3821 "type": "m.media_preview_config"
3822 }));
3823 })
3824 .await;
3825
3826 assert_next_matches!(
3827 stream,
3828 MediaPreviewConfigEventContent {
3829 media_previews: Some(MediaPreviews::Off),
3830 invite_avatars: Some(InviteAvatars::On),
3831 ..
3832 }
3833 );
3834 assert_pending!(stream);
3835 }
3836
3837 #[async_test]
3838 async fn test_unstable_media_preview_config() {
3839 let server = MatrixMockServer::new().await;
3840 let client = server.client_builder().build().await;
3841
3842 server
3843 .mock_sync()
3844 .ok_and_run(&client, |builder| {
3845 builder.add_custom_global_account_data(json!({
3846 "content": {
3847 "media_previews": "private",
3848 "invite_avatars": "off"
3849 },
3850 "type": "io.element.msc4278.media_preview_config"
3851 }));
3852 })
3853 .await;
3854
3855 let (initial_value, stream) =
3856 client.account().observe_media_preview_config().await.unwrap();
3857
3858 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3859 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3860 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3861 pin_mut!(stream);
3862 assert_pending!(stream);
3863
3864 server
3865 .mock_sync()
3866 .ok_and_run(&client, |builder| {
3867 builder.add_custom_global_account_data(json!({
3868 "content": {
3869 "media_previews": "off",
3870 "invite_avatars": "on"
3871 },
3872 "type": "io.element.msc4278.media_preview_config"
3873 }));
3874 })
3875 .await;
3876
3877 assert_next_matches!(
3878 stream,
3879 MediaPreviewConfigEventContent {
3880 media_previews: Some(MediaPreviews::Off),
3881 invite_avatars: Some(InviteAvatars::On),
3882 ..
3883 }
3884 );
3885 assert_pending!(stream);
3886 }
3887
3888 #[async_test]
3889 async fn test_media_preview_config_not_found() {
3890 let server = MatrixMockServer::new().await;
3891 let client = server.client_builder().build().await;
3892
3893 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3894
3895 assert!(initial_value.is_none());
3896 }
3897
3898 #[async_test]
3899 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
3900 // The default Matrix version we use is 1.11 or higher, so authenticated media
3901 // is supported.
3902 let server = MatrixMockServer::new().await;
3903 let client = server.client_builder().build().await;
3904
3905 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3906
3907 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3908 client.load_or_fetch_max_upload_size().await.unwrap();
3909
3910 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3911 }
3912
3913 #[async_test]
3914 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
3915 // The server must advertise support for the stable feature for authenticated
3916 // media support, so we mock the `GET /versions` response.
3917 let server = MatrixMockServer::new().await;
3918 let client = server.client_builder().no_server_versions().build().await;
3919
3920 server
3921 .mock_versions()
3922 .ok_custom(
3923 &["v1.7", "v1.8", "v1.9", "v1.10"],
3924 &[("org.matrix.msc3916.stable", true)].into(),
3925 )
3926 .named("versions")
3927 .expect(1)
3928 .mount()
3929 .await;
3930
3931 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3932
3933 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3934 client.load_or_fetch_max_upload_size().await.unwrap();
3935
3936 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3937 }
3938
3939 #[async_test]
3940 async fn test_load_or_fetch_max_upload_size_no_auth() {
3941 // The server must not support Matrix 1.11 or higher for unauthenticated
3942 // media requests, so we mock the `GET /versions` response.
3943 let server = MatrixMockServer::new().await;
3944 let client = server.client_builder().no_server_versions().build().await;
3945
3946 server
3947 .mock_versions()
3948 .ok_custom(&["v1.1"], &Default::default())
3949 .named("versions")
3950 .expect(1)
3951 .mount()
3952 .await;
3953
3954 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3955
3956 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
3957 client.load_or_fetch_max_upload_size().await.unwrap();
3958
3959 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3960 }
3961
3962 #[async_test]
3963 async fn test_uploading_a_too_large_media_file() {
3964 let server = MatrixMockServer::new().await;
3965 let client = server.client_builder().build().await;
3966
3967 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
3968 client.load_or_fetch_max_upload_size().await.unwrap();
3969 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
3970
3971 let data = vec![1, 2];
3972 let upload_request =
3973 ruma::api::client::media::create_content::v3::Request::new(data.clone());
3974 let request = SendRequest {
3975 client: client.clone(),
3976 request: upload_request,
3977 config: None,
3978 send_progress: SharedObservable::new(TransmissionProgress::default()),
3979 };
3980 let media_request = SendMediaUploadRequest::new(request);
3981
3982 let error = media_request.await.err();
3983 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
3984 assert_eq!(max, uint!(1));
3985 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
3986 }
3987
3988 #[async_test]
3989 async fn test_dont_ignore_timeout_on_first_sync() {
3990 let server = MatrixMockServer::new().await;
3991 let client = server.client_builder().build().await;
3992
3993 server
3994 .mock_sync()
3995 .timeout(Some(Duration::from_secs(30)))
3996 .ok(|_| {})
3997 .mock_once()
3998 .named("sync_with_timeout")
3999 .mount()
4000 .await;
4001
4002 // Call the endpoint once to check the timeout.
4003 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4004
4005 timeout(Duration::from_secs(1), async {
4006 stream.next().await.unwrap().unwrap();
4007 })
4008 .await
4009 .unwrap();
4010 }
4011
4012 #[async_test]
4013 async fn test_ignore_timeout_on_first_sync() {
4014 let server = MatrixMockServer::new().await;
4015 let client = server.client_builder().build().await;
4016
4017 server
4018 .mock_sync()
4019 .timeout(None)
4020 .ok(|_| {})
4021 .mock_once()
4022 .named("sync_no_timeout")
4023 .mount()
4024 .await;
4025 server
4026 .mock_sync()
4027 .timeout(Some(Duration::from_secs(30)))
4028 .ok(|_| {})
4029 .mock_once()
4030 .named("sync_with_timeout")
4031 .mount()
4032 .await;
4033
4034 // Call each version of the endpoint once to check the timeouts.
4035 let mut stream = Box::pin(
4036 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4037 );
4038
4039 timeout(Duration::from_secs(1), async {
4040 stream.next().await.unwrap().unwrap();
4041 stream.next().await.unwrap().unwrap();
4042 })
4043 .await
4044 .unwrap();
4045 }
4046}