matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
33use matrix_sdk_base::{
34 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
35 StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport,
36 event_cache::store::EventCacheStoreLock,
37 media::store::MediaStoreLock,
38 store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
39 sync::{Notification, RoomUpdates},
40};
41use matrix_sdk_common::ttl_cache::TtlCache;
42#[cfg(feature = "e2e-encryption")]
43use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
44use ruma::{
45 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
46 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
47 api::{
48 FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
49 client::{
50 account::whoami,
51 alias::{create_alias, delete_alias, get_alias},
52 authenticated_media,
53 device::{delete_devices, get_devices, update_device},
54 directory::{get_public_rooms, get_public_rooms_filtered},
55 discovery::{
56 discover_homeserver::{self, RtcFocusInfo},
57 get_capabilities::{self, v3::Capabilities},
58 get_supported_versions,
59 },
60 error::ErrorKind,
61 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
62 knock::knock_room,
63 media,
64 membership::{join_room_by_id, join_room_by_id_or_alias},
65 room::create_room,
66 session::login::v3::DiscoveryInfo,
67 sync::sync_events,
68 threads::get_thread_subscriptions_changes,
69 uiaa,
70 user_directory::search_users,
71 },
72 error::FromHttpResponseError,
73 federation::discovery::get_server_version,
74 },
75 assign,
76 push::Ruleset,
77 time::Instant,
78};
79use serde::de::DeserializeOwned;
80use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
81use tracing::{Instrument, Span, debug, error, instrument, trace, warn};
82use url::Url;
83
84use self::futures::SendRequest;
85use crate::{
86 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
87 Room, SessionTokens, TransmissionProgress,
88 authentication::{
89 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
90 oauth::OAuth,
91 },
92 client::thread_subscriptions::ThreadSubscriptionCatchup,
93 config::{RequestConfig, SyncToken},
94 deduplicating_handler::DeduplicatingHandler,
95 error::HttpResult,
96 event_cache::EventCache,
97 event_handler::{
98 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
99 EventHandlerStore, ObservableEventHandler, SyncEvent,
100 },
101 http_client::HttpClient,
102 latest_events::LatestEvents,
103 media::MediaError,
104 notification_settings::NotificationSettings,
105 room::RoomMember,
106 room_preview::RoomPreview,
107 send_queue::{SendQueue, SendQueueData},
108 sliding_sync::Version as SlidingSyncVersion,
109 sync::{RoomUpdate, SyncResponse},
110};
111#[cfg(feature = "e2e-encryption")]
112use crate::{
113 cross_process_lock::CrossProcessLock,
114 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
115};
116
117mod builder;
118pub(crate) mod caches;
119pub(crate) mod futures;
120pub(crate) mod thread_subscriptions;
121
122pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
123#[cfg(feature = "experimental-search")]
124use crate::search_index::SearchIndex;
125
126#[cfg(not(target_family = "wasm"))]
127type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
128#[cfg(target_family = "wasm")]
129type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
130
131#[cfg(not(target_family = "wasm"))]
132type NotificationHandlerFn =
133 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
134#[cfg(target_family = "wasm")]
135type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
136
137/// Enum controlling if a loop running callbacks should continue or abort.
138///
139/// This is mainly used in the [`sync_with_callback`] method, the return value
140/// of the provided callback controls if the sync loop should be exited.
141///
142/// [`sync_with_callback`]: #method.sync_with_callback
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144pub enum LoopCtrl {
145 /// Continue running the loop.
146 Continue,
147 /// Break out of the loop.
148 Break,
149}
150
151/// Represents changes that can occur to a `Client`s `Session`.
152#[derive(Debug, Clone, PartialEq)]
153pub enum SessionChange {
154 /// The session's token is no longer valid.
155 UnknownToken {
156 /// Whether or not the session was soft logged out
157 soft_logout: bool,
158 },
159 /// The session's tokens have been refreshed.
160 TokensRefreshed,
161}
162
163/// Information about the server vendor obtained from the federation API.
164#[derive(Debug, Clone, PartialEq, Eq)]
165#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
166pub struct ServerVendorInfo {
167 /// The server name.
168 pub server_name: String,
169 /// The server version.
170 pub version: String,
171}
172
173/// An async/await enabled Matrix client.
174///
175/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
176#[derive(Clone)]
177pub struct Client {
178 pub(crate) inner: Arc<ClientInner>,
179}
180
181#[derive(Default)]
182pub(crate) struct ClientLocks {
183 /// Lock ensuring that only a single room may be marked as a DM at once.
184 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
185 /// explanation.
186 pub(crate) mark_as_dm_lock: Mutex<()>,
187
188 /// Lock ensuring that only a single secret store is getting opened at the
189 /// same time.
190 ///
191 /// This is important so we don't accidentally create multiple different new
192 /// default secret storage keys.
193 #[cfg(feature = "e2e-encryption")]
194 pub(crate) open_secret_store_lock: Mutex<()>,
195
196 /// Lock ensuring that we're only storing a single secret at a time.
197 ///
198 /// Take a look at the [`SecretStore::put_secret`] method for a more
199 /// detailed explanation.
200 ///
201 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
202 #[cfg(feature = "e2e-encryption")]
203 pub(crate) store_secret_lock: Mutex<()>,
204
205 /// Lock ensuring that only one method at a time might modify our backup.
206 #[cfg(feature = "e2e-encryption")]
207 pub(crate) backup_modify_lock: Mutex<()>,
208
209 /// Lock ensuring that we're going to attempt to upload backups for a single
210 /// requester.
211 #[cfg(feature = "e2e-encryption")]
212 pub(crate) backup_upload_lock: Mutex<()>,
213
214 /// Handler making sure we only have one group session sharing request in
215 /// flight per room.
216 #[cfg(feature = "e2e-encryption")]
217 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
218
219 /// Lock making sure we're only doing one key claim request at a time.
220 #[cfg(feature = "e2e-encryption")]
221 pub(crate) key_claim_lock: Mutex<()>,
222
223 /// Handler to ensure that only one members request is running at a time,
224 /// given a room.
225 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
226
227 /// Handler to ensure that only one encryption state request is running at a
228 /// time, given a room.
229 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
230
231 /// Deduplicating handler for sending read receipts. The string is an
232 /// internal implementation detail, see [`Self::send_single_receipt`].
233 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
234
235 #[cfg(feature = "e2e-encryption")]
236 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
237
238 /// Latest "generation" of data known by the crypto store.
239 ///
240 /// This is a counter that only increments, set in the database (and can
241 /// wrap). It's incremented whenever some process acquires a lock for the
242 /// first time. *This assumes the crypto store lock is being held, to
243 /// avoid data races on writing to this value in the store*.
244 ///
245 /// The current process will maintain this value in local memory and in the
246 /// DB over time. Observing a different value than the one read in
247 /// memory, when reading from the store indicates that somebody else has
248 /// written into the database under our feet.
249 ///
250 /// TODO: this should live in the `OlmMachine`, since it's information
251 /// related to the lock. As of today (2023-07-28), we blow up the entire
252 /// olm machine when there's a generation mismatch. So storing the
253 /// generation in the olm machine would make the client think there's
254 /// *always* a mismatch, and that's why we need to store the generation
255 /// outside the `OlmMachine`.
256 #[cfg(feature = "e2e-encryption")]
257 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
258}
259
260pub(crate) struct ClientInner {
261 /// All the data related to authentication and authorization.
262 pub(crate) auth_ctx: Arc<AuthCtx>,
263
264 /// The URL of the server.
265 ///
266 /// Not to be confused with the `Self::homeserver`. `server` is usually
267 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
268 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
269 /// homeserver (at the time of writing — 2024-08-28).
270 ///
271 /// This value is optional depending on how the `Client` has been built.
272 /// If it's been built from a homeserver URL directly, we don't know the
273 /// server. However, if the `Client` has been built from a server URL or
274 /// name, then the homeserver has been discovered, and we know both.
275 server: Option<Url>,
276
277 /// The URL of the homeserver to connect to.
278 ///
279 /// This is the URL for the client-server Matrix API.
280 homeserver: StdRwLock<Url>,
281
282 /// The sliding sync version.
283 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
284
285 /// The underlying HTTP client.
286 pub(crate) http_client: HttpClient,
287
288 /// User session data.
289 pub(super) base_client: BaseClient,
290
291 /// Collection of in-memory caches for the [`Client`].
292 pub(crate) caches: ClientCaches,
293
294 /// Collection of locks individual client methods might want to use, either
295 /// to ensure that only a single call to a method happens at once or to
296 /// deduplicate multiple calls to a method.
297 pub(crate) locks: ClientLocks,
298
299 /// The cross-process store locks holder name.
300 ///
301 /// The SDK provides cross-process store locks (see
302 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
303 /// `holder_name` is the value used for all cross-process store locks
304 /// used by this `Client`.
305 ///
306 /// If multiple `Client`s are running in different processes, this
307 /// value MUST be different for each `Client`.
308 cross_process_store_locks_holder_name: String,
309
310 /// A mapping of the times at which the current user sent typing notices,
311 /// keyed by room.
312 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
313
314 /// Event handlers. See `add_event_handler`.
315 pub(crate) event_handlers: EventHandlerStore,
316
317 /// Notification handlers. See `register_notification_handler`.
318 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
319
320 /// The sender-side of channels used to receive room updates.
321 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
322
323 /// The sender-side of a channel used to observe all the room updates of a
324 /// sync response.
325 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
326
327 /// Whether the client should update its homeserver URL with the discovery
328 /// information present in the login response.
329 respect_login_well_known: bool,
330
331 /// An event that can be listened on to wait for a successful sync. The
332 /// event will only be fired if a sync loop is running. Can be used for
333 /// synchronization, e.g. if we send out a request to create a room, we can
334 /// wait for the sync to get the data to fetch a room object from the state
335 /// store.
336 pub(crate) sync_beat: event_listener::Event,
337
338 /// A central cache for events, inactive first.
339 ///
340 /// It becomes active when [`EventCache::subscribe`] is called.
341 pub(crate) event_cache: OnceCell<EventCache>,
342
343 /// End-to-end encryption related state.
344 #[cfg(feature = "e2e-encryption")]
345 pub(crate) e2ee: EncryptionData,
346
347 /// The verification state of our own device.
348 #[cfg(feature = "e2e-encryption")]
349 pub(crate) verification_state: SharedObservable<VerificationState>,
350
351 /// Whether to enable the experimental support for sending and receiving
352 /// encrypted room history on invite, per [MSC4268].
353 ///
354 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
355 #[cfg(feature = "e2e-encryption")]
356 pub(crate) enable_share_history_on_invite: bool,
357
358 /// Data related to the [`SendQueue`].
359 ///
360 /// [`SendQueue`]: crate::send_queue::SendQueue
361 pub(crate) send_queue_data: Arc<SendQueueData>,
362
363 /// The `max_upload_size` value of the homeserver, it contains the max
364 /// request size you can send.
365 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
366
367 /// The entry point to get the [`LatestEvent`] of rooms and threads.
368 ///
369 /// [`LatestEvent`]: crate::latest_event::LatestEvent
370 latest_events: OnceCell<LatestEvents>,
371
372 /// Service handling the catching up of thread subscriptions in the
373 /// background.
374 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
375
376 #[cfg(feature = "experimental-search")]
377 /// Handler for [`RoomIndex`]'s of each room
378 search_index: SearchIndex,
379}
380
381impl ClientInner {
382 /// Create a new `ClientInner`.
383 ///
384 /// All the fields passed as parameters here are those that must be cloned
385 /// upon instantiation of a sub-client, e.g. a client specialized for
386 /// notifications.
387 #[allow(clippy::too_many_arguments)]
388 async fn new(
389 auth_ctx: Arc<AuthCtx>,
390 server: Option<Url>,
391 homeserver: Url,
392 sliding_sync_version: SlidingSyncVersion,
393 http_client: HttpClient,
394 base_client: BaseClient,
395 server_info: ClientServerInfo,
396 respect_login_well_known: bool,
397 event_cache: OnceCell<EventCache>,
398 send_queue: Arc<SendQueueData>,
399 latest_events: OnceCell<LatestEvents>,
400 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
401 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
402 cross_process_store_locks_holder_name: String,
403 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
404 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
405 ) -> Arc<Self> {
406 let caches = ClientCaches {
407 server_info: server_info.into(),
408 server_metadata: Mutex::new(TtlCache::new()),
409 };
410
411 let client = Self {
412 server,
413 homeserver: StdRwLock::new(homeserver),
414 auth_ctx,
415 sliding_sync_version: StdRwLock::new(sliding_sync_version),
416 http_client,
417 base_client,
418 caches,
419 locks: Default::default(),
420 cross_process_store_locks_holder_name,
421 typing_notice_times: Default::default(),
422 event_handlers: Default::default(),
423 notification_handlers: Default::default(),
424 room_update_channels: Default::default(),
425 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
426 // ballast for all observers to catch up.
427 room_updates_sender: broadcast::Sender::new(32),
428 respect_login_well_known,
429 sync_beat: event_listener::Event::new(),
430 event_cache,
431 send_queue_data: send_queue,
432 latest_events,
433 #[cfg(feature = "e2e-encryption")]
434 e2ee: EncryptionData::new(encryption_settings),
435 #[cfg(feature = "e2e-encryption")]
436 verification_state: SharedObservable::new(VerificationState::Unknown),
437 #[cfg(feature = "e2e-encryption")]
438 enable_share_history_on_invite,
439 server_max_upload_size: Mutex::new(OnceCell::new()),
440 #[cfg(feature = "experimental-search")]
441 search_index: search_index_handler,
442 thread_subscription_catchup,
443 };
444
445 #[allow(clippy::let_and_return)]
446 let client = Arc::new(client);
447
448 #[cfg(feature = "e2e-encryption")]
449 client.e2ee.initialize_tasks(&client);
450
451 let _ = client
452 .event_cache
453 .get_or_init(|| async {
454 EventCache::new(
455 WeakClient::from_inner(&client),
456 client.base_client.event_cache_store().clone(),
457 )
458 })
459 .await;
460
461 let _ = client
462 .thread_subscription_catchup
463 .get_or_init(|| async {
464 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
465 })
466 .await;
467
468 client
469 }
470}
471
472#[cfg(not(tarpaulin_include))]
473impl Debug for Client {
474 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
475 write!(fmt, "Client")
476 }
477}
478
479impl Client {
480 /// Create a new [`Client`] that will use the given homeserver.
481 ///
482 /// # Arguments
483 ///
484 /// * `homeserver_url` - The homeserver that the client should connect to.
485 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
486 Self::builder().homeserver_url(homeserver_url).build().await
487 }
488
489 /// Returns a subscriber that publishes an event every time the ignore user
490 /// list changes.
491 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
492 self.inner.base_client.subscribe_to_ignore_user_list_changes()
493 }
494
495 /// Create a new [`ClientBuilder`].
496 pub fn builder() -> ClientBuilder {
497 ClientBuilder::new()
498 }
499
500 pub(crate) fn base_client(&self) -> &BaseClient {
501 &self.inner.base_client
502 }
503
504 /// The underlying HTTP client.
505 pub fn http_client(&self) -> &reqwest::Client {
506 &self.inner.http_client.inner
507 }
508
509 pub(crate) fn locks(&self) -> &ClientLocks {
510 &self.inner.locks
511 }
512
513 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
514 &self.inner.auth_ctx
515 }
516
517 /// The cross-process store locks holder name.
518 ///
519 /// The SDK provides cross-process store locks (see
520 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
521 /// `holder_name` is the value used for all cross-process store locks
522 /// used by this `Client`.
523 pub fn cross_process_store_locks_holder_name(&self) -> &str {
524 &self.inner.cross_process_store_locks_holder_name
525 }
526
527 /// Change the homeserver URL used by this client.
528 ///
529 /// # Arguments
530 ///
531 /// * `homeserver_url` - The new URL to use.
532 fn set_homeserver(&self, homeserver_url: Url) {
533 *self.inner.homeserver.write().unwrap() = homeserver_url;
534 }
535
536 /// Get the capabilities of the homeserver.
537 ///
538 /// This method should be used to check what features are supported by the
539 /// homeserver.
540 ///
541 /// # Examples
542 ///
543 /// ```no_run
544 /// # use matrix_sdk::Client;
545 /// # use url::Url;
546 /// # async {
547 /// # let homeserver = Url::parse("http://example.com")?;
548 /// let client = Client::new(homeserver).await?;
549 ///
550 /// let capabilities = client.get_capabilities().await?;
551 ///
552 /// if capabilities.change_password.enabled {
553 /// // Change password
554 /// }
555 /// # anyhow::Ok(()) };
556 /// ```
557 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
558 let res = self.send(get_capabilities::v3::Request::new()).await?;
559 Ok(res.capabilities)
560 }
561
562 /// Get the server vendor information from the federation API.
563 ///
564 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
565 /// both the server's software name and version.
566 ///
567 /// # Examples
568 ///
569 /// ```no_run
570 /// # use matrix_sdk::Client;
571 /// # use url::Url;
572 /// # async {
573 /// # let homeserver = Url::parse("http://example.com")?;
574 /// let client = Client::new(homeserver).await?;
575 ///
576 /// let server_info = client.server_vendor_info(None).await?;
577 /// println!(
578 /// "Server: {}, Version: {}",
579 /// server_info.server_name, server_info.version
580 /// );
581 /// # anyhow::Ok(()) };
582 /// ```
583 pub async fn server_vendor_info(
584 &self,
585 request_config: Option<RequestConfig>,
586 ) -> HttpResult<ServerVendorInfo> {
587 let res = self
588 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
589 .await?;
590
591 // Extract server info, using defaults if fields are missing.
592 let server = res.server.unwrap_or_default();
593 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
594 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
595
596 Ok(ServerVendorInfo { server_name: server_name_str, version })
597 }
598
599 /// Get a copy of the default request config.
600 ///
601 /// The default request config is what's used when sending requests if no
602 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
603 /// function with such a parameter.
604 ///
605 /// If the default request config was not customized through
606 /// [`ClientBuilder`] when creating this `Client`, the returned value will
607 /// be equivalent to [`RequestConfig::default()`].
608 pub fn request_config(&self) -> RequestConfig {
609 self.inner.http_client.request_config
610 }
611
612 /// Check whether the client has been activated.
613 ///
614 /// A client is considered active when:
615 ///
616 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
617 /// is logged in,
618 /// 2. Has loaded cached data from storage,
619 /// 3. If encryption is enabled, it also initialized or restored its
620 /// `OlmMachine`.
621 pub fn is_active(&self) -> bool {
622 self.inner.base_client.is_active()
623 }
624
625 /// The server used by the client.
626 ///
627 /// See `Self::server` to learn more.
628 pub fn server(&self) -> Option<&Url> {
629 self.inner.server.as_ref()
630 }
631
632 /// The homeserver of the client.
633 pub fn homeserver(&self) -> Url {
634 self.inner.homeserver.read().unwrap().clone()
635 }
636
637 /// Get the sliding sync version.
638 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
639 self.inner.sliding_sync_version.read().unwrap().clone()
640 }
641
642 /// Override the sliding sync version.
643 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
644 let mut lock = self.inner.sliding_sync_version.write().unwrap();
645 *lock = version;
646 }
647
648 /// Get the Matrix user session meta information.
649 ///
650 /// If the client is currently logged in, this will return a
651 /// [`SessionMeta`] object which contains the user ID and device ID.
652 /// Otherwise it returns `None`.
653 pub fn session_meta(&self) -> Option<&SessionMeta> {
654 self.base_client().session_meta()
655 }
656
657 /// Returns a receiver that gets events for each room info update. To watch
658 /// for new events, use `receiver.resubscribe()`.
659 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
660 self.base_client().room_info_notable_update_receiver()
661 }
662
663 /// Performs a search for users.
664 /// The search is performed case-insensitively on user IDs and display names
665 ///
666 /// # Arguments
667 ///
668 /// * `search_term` - The search term for the search
669 /// * `limit` - The maximum number of results to return. Defaults to 10.
670 ///
671 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
672 pub async fn search_users(
673 &self,
674 search_term: &str,
675 limit: u64,
676 ) -> HttpResult<search_users::v3::Response> {
677 let mut request = search_users::v3::Request::new(search_term.to_owned());
678
679 if let Some(limit) = UInt::new(limit) {
680 request.limit = limit;
681 }
682
683 self.send(request).await
684 }
685
686 /// Get the user id of the current owner of the client.
687 pub fn user_id(&self) -> Option<&UserId> {
688 self.session_meta().map(|s| s.user_id.as_ref())
689 }
690
691 /// Get the device ID that identifies the current session.
692 pub fn device_id(&self) -> Option<&DeviceId> {
693 self.session_meta().map(|s| s.device_id.as_ref())
694 }
695
696 /// Get the current access token for this session.
697 ///
698 /// Will be `None` if the client has not been logged in.
699 pub fn access_token(&self) -> Option<String> {
700 self.auth_ctx().access_token()
701 }
702
703 /// Get the current tokens for this session.
704 ///
705 /// To be notified of changes in the session tokens, use
706 /// [`Client::subscribe_to_session_changes()`] or
707 /// [`Client::set_session_callbacks()`].
708 ///
709 /// Returns `None` if the client has not been logged in.
710 pub fn session_tokens(&self) -> Option<SessionTokens> {
711 self.auth_ctx().session_tokens()
712 }
713
714 /// Access the authentication API used to log in this client.
715 ///
716 /// Will be `None` if the client has not been logged in.
717 pub fn auth_api(&self) -> Option<AuthApi> {
718 match self.auth_ctx().auth_data.get()? {
719 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
720 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
721 }
722 }
723
724 /// Get the whole session info of this client.
725 ///
726 /// Will be `None` if the client has not been logged in.
727 ///
728 /// Can be used with [`Client::restore_session`] to restore a previously
729 /// logged-in session.
730 pub fn session(&self) -> Option<AuthSession> {
731 match self.auth_api()? {
732 AuthApi::Matrix(api) => api.session().map(Into::into),
733 AuthApi::OAuth(api) => api.full_session().map(Into::into),
734 }
735 }
736
737 /// Get a reference to the state store.
738 pub fn state_store(&self) -> &DynStateStore {
739 self.base_client().state_store()
740 }
741
742 /// Get a reference to the event cache store.
743 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
744 self.base_client().event_cache_store()
745 }
746
747 /// Get a reference to the media store.
748 pub fn media_store(&self) -> &MediaStoreLock {
749 self.base_client().media_store()
750 }
751
752 /// Access the native Matrix authentication API with this client.
753 pub fn matrix_auth(&self) -> MatrixAuth {
754 MatrixAuth::new(self.clone())
755 }
756
757 /// Get the account of the current owner of the client.
758 pub fn account(&self) -> Account {
759 Account::new(self.clone())
760 }
761
762 /// Get the encryption manager of the client.
763 #[cfg(feature = "e2e-encryption")]
764 pub fn encryption(&self) -> Encryption {
765 Encryption::new(self.clone())
766 }
767
768 /// Get the media manager of the client.
769 pub fn media(&self) -> Media {
770 Media::new(self.clone())
771 }
772
773 /// Get the pusher manager of the client.
774 pub fn pusher(&self) -> Pusher {
775 Pusher::new(self.clone())
776 }
777
778 /// Access the OAuth 2.0 API of the client.
779 pub fn oauth(&self) -> OAuth {
780 OAuth::new(self.clone())
781 }
782
783 /// Register a handler for a specific event type.
784 ///
785 /// The handler is a function or closure with one or more arguments. The
786 /// first argument is the event itself. All additional arguments are
787 /// "context" arguments: They have to implement [`EventHandlerContext`].
788 /// This trait is named that way because most of the types implementing it
789 /// give additional context about an event: The room it was in, its raw form
790 /// and other similar things. As two exceptions to this,
791 /// [`Client`] and [`EventHandlerHandle`] also implement the
792 /// `EventHandlerContext` trait so you don't have to clone your client
793 /// into the event handler manually and a handler can decide to remove
794 /// itself.
795 ///
796 /// Some context arguments are not universally applicable. A context
797 /// argument that isn't available for the given event type will result in
798 /// the event handler being skipped and an error being logged. The following
799 /// context argument types are only available for a subset of event types:
800 ///
801 /// * [`Room`] is only available for room-specific events, i.e. not for
802 /// events like global account data events or presence events.
803 ///
804 /// You can provide custom context via
805 /// [`add_event_handler_context`](Client::add_event_handler_context) and
806 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
807 /// into the event handler.
808 ///
809 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
810 ///
811 /// # Examples
812 ///
813 /// ```no_run
814 /// use matrix_sdk::{
815 /// deserialized_responses::EncryptionInfo,
816 /// event_handler::Ctx,
817 /// ruma::{
818 /// events::{
819 /// macros::EventContent,
820 /// push_rules::PushRulesEvent,
821 /// room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
822 /// },
823 /// push::Action,
824 /// Int, MilliSecondsSinceUnixEpoch,
825 /// },
826 /// Client, Room,
827 /// };
828 /// use serde::{Deserialize, Serialize};
829 ///
830 /// # async fn example(client: Client) {
831 /// client.add_event_handler(
832 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
833 /// // Common usage: Room event plus room and client.
834 /// },
835 /// );
836 /// client.add_event_handler(
837 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
838 /// async move {
839 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
840 /// // unencrypted events and events that were decrypted by the SDK.
841 /// }
842 /// },
843 /// );
844 /// client.add_event_handler(
845 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
846 /// async move {
847 /// // A `Vec<Action>` parameter allows you to know which push actions
848 /// // are applicable for an event. For example, an event with
849 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
850 /// // in the timeline.
851 /// }
852 /// },
853 /// );
854 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
855 /// // You can omit any or all arguments after the first.
856 /// });
857 ///
858 /// // Registering a temporary event handler:
859 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
860 /// /* Event handler */
861 /// });
862 /// client.remove_event_handler(handle);
863 ///
864 /// // Registering custom event handler context:
865 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
866 /// struct MyContext {
867 /// number: usize,
868 /// }
869 /// client.add_event_handler_context(MyContext { number: 5 });
870 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
871 /// // Use the context
872 /// });
873 ///
874 /// // Custom events work exactly the same way, you just need to declare
875 /// // the content struct and use the EventContent derive macro on it.
876 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
877 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
878 /// struct TokenEventContent {
879 /// token: String,
880 /// #[serde(rename = "exp")]
881 /// expires_at: MilliSecondsSinceUnixEpoch,
882 /// }
883 ///
884 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
885 /// todo!("Display the token");
886 /// });
887 ///
888 /// // Event handler closures can also capture local variables.
889 /// // Make sure they are cheap to clone though, because they will be cloned
890 /// // every time the closure is called.
891 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
892 ///
893 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
894 /// println!("Calling the handler with identifier {data}");
895 /// });
896 /// # }
897 /// ```
898 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
899 where
900 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
901 H: EventHandler<Ev, Ctx>,
902 {
903 self.add_event_handler_impl(handler, None)
904 }
905
906 /// Register a handler for a specific room, and event type.
907 ///
908 /// This method works the same way as
909 /// [`add_event_handler`][Self::add_event_handler], except that the handler
910 /// will only be called for events in the room with the specified ID. See
911 /// that method for more details on event handler functions.
912 ///
913 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
914 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
915 /// your use case.
916 pub fn add_room_event_handler<Ev, Ctx, H>(
917 &self,
918 room_id: &RoomId,
919 handler: H,
920 ) -> EventHandlerHandle
921 where
922 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
923 H: EventHandler<Ev, Ctx>,
924 {
925 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
926 }
927
928 /// Observe a specific event type.
929 ///
930 /// `Ev` represents the kind of event that will be observed. `Ctx`
931 /// represents the context that will come with the event. It relies on the
932 /// same mechanism as [`Client::add_event_handler`]. The main difference is
933 /// that it returns an [`ObservableEventHandler`] and doesn't require a
934 /// user-defined closure. It is possible to subscribe to the
935 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
936 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
937 /// Ctx)`.
938 ///
939 /// Be careful that only the most recent value can be observed. Subscribers
940 /// are notified when a new value is sent, but there is no guarantee
941 /// that they will see all values.
942 ///
943 /// # Example
944 ///
945 /// Let's see a classical usage:
946 ///
947 /// ```
948 /// use futures_util::StreamExt as _;
949 /// use matrix_sdk::{
950 /// Client, Room,
951 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
952 /// };
953 ///
954 /// # async fn example(client: Client) -> Option<()> {
955 /// let observer =
956 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
957 ///
958 /// let mut subscriber = observer.subscribe();
959 ///
960 /// let (event, (room, push_actions)) = subscriber.next().await?;
961 /// # Some(())
962 /// # }
963 /// ```
964 ///
965 /// Now let's see how to get several contexts that can be useful for you:
966 ///
967 /// ```
968 /// use matrix_sdk::{
969 /// Client, Room,
970 /// deserialized_responses::EncryptionInfo,
971 /// ruma::{
972 /// events::room::{
973 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
974 /// },
975 /// push::Action,
976 /// },
977 /// };
978 ///
979 /// # async fn example(client: Client) {
980 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
981 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
982 ///
983 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
984 /// // to distinguish between unencrypted events and events that were decrypted
985 /// // by the SDK.
986 /// let _ = client
987 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
988 /// );
989 ///
990 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
991 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
992 /// // should be highlighted in the timeline.
993 /// let _ =
994 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
995 ///
996 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
997 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
998 /// # }
999 /// ```
1000 ///
1001 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1002 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1003 where
1004 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1005 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1006 {
1007 self.observe_room_events_impl(None)
1008 }
1009
1010 /// Observe a specific room, and event type.
1011 ///
1012 /// This method works the same way as [`Client::observe_events`], except
1013 /// that the observability will only be applied for events in the room with
1014 /// the specified ID. See that method for more details.
1015 ///
1016 /// Be careful that only the most recent value can be observed. Subscribers
1017 /// are notified when a new value is sent, but there is no guarantee
1018 /// that they will see all values.
1019 pub fn observe_room_events<Ev, Ctx>(
1020 &self,
1021 room_id: &RoomId,
1022 ) -> ObservableEventHandler<(Ev, Ctx)>
1023 where
1024 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1025 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1026 {
1027 self.observe_room_events_impl(Some(room_id.to_owned()))
1028 }
1029
1030 /// Shared implementation for `Client::observe_events` and
1031 /// `Client::observe_room_events`.
1032 fn observe_room_events_impl<Ev, Ctx>(
1033 &self,
1034 room_id: Option<OwnedRoomId>,
1035 ) -> ObservableEventHandler<(Ev, Ctx)>
1036 where
1037 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1038 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1039 {
1040 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1041 // new value.
1042 let shared_observable = SharedObservable::new(None);
1043
1044 ObservableEventHandler::new(
1045 shared_observable.clone(),
1046 self.event_handler_drop_guard(self.add_event_handler_impl(
1047 move |event: Ev, context: Ctx| {
1048 shared_observable.set(Some((event, context)));
1049
1050 ready(())
1051 },
1052 room_id,
1053 )),
1054 )
1055 }
1056
1057 /// Remove the event handler associated with the handle.
1058 ///
1059 /// Note that you **must not** call `remove_event_handler` from the
1060 /// non-async part of an event handler, that is:
1061 ///
1062 /// ```ignore
1063 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1064 /// // ⚠ this will cause a deadlock ⚠
1065 /// client.remove_event_handler(handle);
1066 ///
1067 /// async move {
1068 /// // removing the event handler here is fine
1069 /// client.remove_event_handler(handle);
1070 /// }
1071 /// })
1072 /// ```
1073 ///
1074 /// Note also that handlers that remove themselves will still execute with
1075 /// events received in the same sync cycle.
1076 ///
1077 /// # Arguments
1078 ///
1079 /// `handle` - The [`EventHandlerHandle`] that is returned when
1080 /// registering the event handler with [`Client::add_event_handler`].
1081 ///
1082 /// # Examples
1083 ///
1084 /// ```no_run
1085 /// # use url::Url;
1086 /// # use tokio::sync::mpsc;
1087 /// #
1088 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1089 /// #
1090 /// use matrix_sdk::{
1091 /// Client, event_handler::EventHandlerHandle,
1092 /// ruma::events::room::member::SyncRoomMemberEvent,
1093 /// };
1094 /// #
1095 /// # futures_executor::block_on(async {
1096 /// # let client = matrix_sdk::Client::builder()
1097 /// # .homeserver_url(homeserver)
1098 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1099 /// # .build()
1100 /// # .await
1101 /// # .unwrap();
1102 ///
1103 /// client.add_event_handler(
1104 /// |ev: SyncRoomMemberEvent,
1105 /// client: Client,
1106 /// handle: EventHandlerHandle| async move {
1107 /// // Common usage: Check arriving Event is the expected one
1108 /// println!("Expected RoomMemberEvent received!");
1109 /// client.remove_event_handler(handle);
1110 /// },
1111 /// );
1112 /// # });
1113 /// ```
1114 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1115 self.inner.event_handlers.remove(handle);
1116 }
1117
1118 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1119 /// the given handle.
1120 ///
1121 /// When the returned value is dropped, the event handler will be removed.
1122 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1123 EventHandlerDropGuard::new(handle, self.clone())
1124 }
1125
1126 /// Add an arbitrary value for use as event handler context.
1127 ///
1128 /// The value can be obtained in an event handler by adding an argument of
1129 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1130 ///
1131 /// If a value of the same type has been added before, it will be
1132 /// overwritten.
1133 ///
1134 /// # Examples
1135 ///
1136 /// ```no_run
1137 /// use matrix_sdk::{
1138 /// Room, event_handler::Ctx,
1139 /// ruma::events::room::message::SyncRoomMessageEvent,
1140 /// };
1141 /// # #[derive(Clone)]
1142 /// # struct SomeType;
1143 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1144 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1145 /// # futures_executor::block_on(async {
1146 /// # let client = matrix_sdk::Client::builder()
1147 /// # .homeserver_url(homeserver)
1148 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1149 /// # .build()
1150 /// # .await
1151 /// # .unwrap();
1152 ///
1153 /// // Handle used to send messages to the UI part of the app
1154 /// let my_gui_handle: SomeType = obtain_gui_handle();
1155 ///
1156 /// client.add_event_handler_context(my_gui_handle.clone());
1157 /// client.add_event_handler(
1158 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1159 /// async move {
1160 /// // gui_handle.send(DisplayMessage { message: ev });
1161 /// }
1162 /// },
1163 /// );
1164 /// # });
1165 /// ```
1166 pub fn add_event_handler_context<T>(&self, ctx: T)
1167 where
1168 T: Clone + Send + Sync + 'static,
1169 {
1170 self.inner.event_handlers.add_context(ctx);
1171 }
1172
1173 /// Register a handler for a notification.
1174 ///
1175 /// Similar to [`Client::add_event_handler`], but only allows functions
1176 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1177 /// [`Client`] for now.
1178 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1179 where
1180 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1181 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1182 {
1183 self.inner.notification_handlers.write().await.push(Box::new(
1184 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1185 ));
1186
1187 self
1188 }
1189
1190 /// Subscribe to all updates for the room with the given ID.
1191 ///
1192 /// The returned receiver will receive a new message for each sync response
1193 /// that contains updates for that room.
1194 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1195 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1196 btree_map::Entry::Vacant(entry) => {
1197 let (tx, rx) = broadcast::channel(8);
1198 entry.insert(tx);
1199 rx
1200 }
1201 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1202 }
1203 }
1204
1205 /// Subscribe to all updates to all rooms, whenever any has been received in
1206 /// a sync response.
1207 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1208 self.inner.room_updates_sender.subscribe()
1209 }
1210
1211 pub(crate) async fn notification_handlers(
1212 &self,
1213 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1214 self.inner.notification_handlers.read().await
1215 }
1216
1217 /// Get all the rooms the client knows about.
1218 ///
1219 /// This will return the list of joined, invited, and left rooms.
1220 pub fn rooms(&self) -> Vec<Room> {
1221 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1222 }
1223
1224 /// Get all the rooms the client knows about, filtered by room state.
1225 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1226 self.base_client()
1227 .rooms_filtered(filter)
1228 .into_iter()
1229 .map(|room| Room::new(self.clone(), room))
1230 .collect()
1231 }
1232
1233 /// Get a stream of all the rooms, in addition to the existing rooms.
1234 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1235 let (rooms, stream) = self.base_client().rooms_stream();
1236
1237 let map_room = |room| Room::new(self.clone(), room);
1238
1239 (
1240 rooms.into_iter().map(map_room).collect(),
1241 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1242 )
1243 }
1244
1245 /// Returns the joined rooms this client knows about.
1246 pub fn joined_rooms(&self) -> Vec<Room> {
1247 self.rooms_filtered(RoomStateFilter::JOINED)
1248 }
1249
1250 /// Returns the invited rooms this client knows about.
1251 pub fn invited_rooms(&self) -> Vec<Room> {
1252 self.rooms_filtered(RoomStateFilter::INVITED)
1253 }
1254
1255 /// Returns the left rooms this client knows about.
1256 pub fn left_rooms(&self) -> Vec<Room> {
1257 self.rooms_filtered(RoomStateFilter::LEFT)
1258 }
1259
1260 /// Returns the joined space rooms this client knows about.
1261 pub fn joined_space_rooms(&self) -> Vec<Room> {
1262 self.base_client()
1263 .rooms_filtered(RoomStateFilter::JOINED)
1264 .into_iter()
1265 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1266 .collect()
1267 }
1268
1269 /// Get a room with the given room id.
1270 ///
1271 /// # Arguments
1272 ///
1273 /// `room_id` - The unique id of the room that should be fetched.
1274 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1275 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1276 }
1277
1278 /// Gets the preview of a room, whether the current user has joined it or
1279 /// not.
1280 pub async fn get_room_preview(
1281 &self,
1282 room_or_alias_id: &RoomOrAliasId,
1283 via: Vec<OwnedServerName>,
1284 ) -> Result<RoomPreview> {
1285 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1286 Ok(room_id) => room_id.to_owned(),
1287 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1288 };
1289
1290 if let Some(room) = self.get_room(&room_id) {
1291 // The cached data can only be trusted if the room state is joined or
1292 // banned: for invite and knock rooms, no updates will be received
1293 // for the rooms after the invite/knock action took place so we may
1294 // have very out to date data for important fields such as
1295 // `join_rule`. For left rooms, the homeserver should return the latest info.
1296 match room.state() {
1297 RoomState::Joined | RoomState::Banned => {
1298 return Ok(RoomPreview::from_known_room(&room).await);
1299 }
1300 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1301 }
1302 }
1303
1304 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1305 }
1306
1307 /// Resolve a room alias to a room id and a list of servers which know
1308 /// about it.
1309 ///
1310 /// # Arguments
1311 ///
1312 /// `room_alias` - The room alias to be resolved.
1313 pub async fn resolve_room_alias(
1314 &self,
1315 room_alias: &RoomAliasId,
1316 ) -> HttpResult<get_alias::v3::Response> {
1317 let request = get_alias::v3::Request::new(room_alias.to_owned());
1318 self.send(request).await
1319 }
1320
1321 /// Checks if a room alias is not in use yet.
1322 ///
1323 /// Returns:
1324 /// - `Ok(true)` if the room alias is available.
1325 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1326 /// status code).
1327 /// - An `Err` otherwise.
1328 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1329 match self.resolve_room_alias(alias).await {
1330 // The room alias was resolved, so it's already in use.
1331 Ok(_) => Ok(false),
1332 Err(error) => {
1333 match error.client_api_error_kind() {
1334 // The room alias wasn't found, so it's available.
1335 Some(ErrorKind::NotFound) => Ok(true),
1336 _ => Err(error),
1337 }
1338 }
1339 }
1340 }
1341
1342 /// Adds a new room alias associated with a room to the room directory.
1343 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1344 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1345 self.send(request).await?;
1346 Ok(())
1347 }
1348
1349 /// Removes a room alias from the room directory.
1350 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1351 let request = delete_alias::v3::Request::new(alias.to_owned());
1352 self.send(request).await?;
1353 Ok(())
1354 }
1355
1356 /// Update the homeserver from the login response well-known if needed.
1357 ///
1358 /// # Arguments
1359 ///
1360 /// * `login_well_known` - The `well_known` field from a successful login
1361 /// response.
1362 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1363 if self.inner.respect_login_well_known
1364 && let Some(well_known) = login_well_known
1365 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1366 {
1367 self.set_homeserver(homeserver);
1368 }
1369 }
1370
1371 /// Similar to [`Client::restore_session_with`], with
1372 /// [`RoomLoadSettings::default()`].
1373 ///
1374 /// # Panics
1375 ///
1376 /// Panics if a session was already restored or logged in.
1377 #[instrument(skip_all)]
1378 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1379 self.restore_session_with(session, RoomLoadSettings::default()).await
1380 }
1381
1382 /// Restore a session previously logged-in using one of the available
1383 /// authentication APIs. The number of rooms to restore is controlled by
1384 /// [`RoomLoadSettings`].
1385 ///
1386 /// See the documentation of the corresponding authentication API's
1387 /// `restore_session` method for more information.
1388 ///
1389 /// # Panics
1390 ///
1391 /// Panics if a session was already restored or logged in.
1392 #[instrument(skip_all)]
1393 pub async fn restore_session_with(
1394 &self,
1395 session: impl Into<AuthSession>,
1396 room_load_settings: RoomLoadSettings,
1397 ) -> Result<()> {
1398 let session = session.into();
1399 match session {
1400 AuthSession::Matrix(session) => {
1401 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1402 }
1403 AuthSession::OAuth(session) => {
1404 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1405 }
1406 }
1407 }
1408
1409 /// Refresh the access token using the authentication API used to log into
1410 /// this session.
1411 ///
1412 /// See the documentation of the authentication API's `refresh_access_token`
1413 /// method for more information.
1414 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1415 let Some(auth_api) = self.auth_api() else {
1416 return Err(RefreshTokenError::RefreshTokenRequired);
1417 };
1418
1419 match auth_api {
1420 AuthApi::Matrix(api) => {
1421 trace!("Token refresh: Using the homeserver.");
1422 Box::pin(api.refresh_access_token()).await?;
1423 }
1424 AuthApi::OAuth(api) => {
1425 trace!("Token refresh: Using OAuth 2.0.");
1426 Box::pin(api.refresh_access_token()).await?;
1427 }
1428 }
1429
1430 Ok(())
1431 }
1432
1433 /// Log out the current session using the proper authentication API.
1434 ///
1435 /// # Errors
1436 ///
1437 /// Returns an error if the session is not authenticated or if an error
1438 /// occurred while making the request to the server.
1439 pub async fn logout(&self) -> Result<(), Error> {
1440 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1441 match auth_api {
1442 AuthApi::Matrix(matrix_auth) => {
1443 matrix_auth.logout().await?;
1444 Ok(())
1445 }
1446 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1447 }
1448 }
1449
1450 /// Get or upload a sync filter.
1451 ///
1452 /// This method will either get a filter ID from the store or upload the
1453 /// filter definition to the homeserver and return the new filter ID.
1454 ///
1455 /// # Arguments
1456 ///
1457 /// * `filter_name` - The unique name of the filter, this name will be used
1458 /// locally to store and identify the filter ID returned by the server.
1459 ///
1460 /// * `definition` - The filter definition that should be uploaded to the
1461 /// server if no filter ID can be found in the store.
1462 ///
1463 /// # Examples
1464 ///
1465 /// ```no_run
1466 /// # use matrix_sdk::{
1467 /// # Client, config::SyncSettings,
1468 /// # ruma::api::client::{
1469 /// # filter::{
1470 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1471 /// # },
1472 /// # sync::sync_events::v3::Filter,
1473 /// # }
1474 /// # };
1475 /// # use url::Url;
1476 /// # async {
1477 /// # let homeserver = Url::parse("http://example.com").unwrap();
1478 /// # let client = Client::new(homeserver).await.unwrap();
1479 /// let mut filter = FilterDefinition::default();
1480 ///
1481 /// // Let's enable member lazy loading.
1482 /// filter.room.state.lazy_load_options =
1483 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1484 ///
1485 /// let filter_id = client
1486 /// .get_or_upload_filter("sync", filter)
1487 /// .await
1488 /// .unwrap();
1489 ///
1490 /// let sync_settings = SyncSettings::new()
1491 /// .filter(Filter::FilterId(filter_id));
1492 ///
1493 /// let response = client.sync_once(sync_settings).await.unwrap();
1494 /// # };
1495 #[instrument(skip(self, definition))]
1496 pub async fn get_or_upload_filter(
1497 &self,
1498 filter_name: &str,
1499 definition: FilterDefinition,
1500 ) -> Result<String> {
1501 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1502 debug!("Found filter locally");
1503 Ok(filter)
1504 } else {
1505 debug!("Didn't find filter locally");
1506 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1507 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1508 let response = self.send(request).await?;
1509
1510 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1511
1512 Ok(response.filter_id)
1513 }
1514 }
1515
1516 /// Prepare to join a room by ID, by getting the current details about it
1517 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1518 let room = self.get_room(room_id)?;
1519
1520 let inviter = match room.invite_details().await {
1521 Ok(details) => details.inviter,
1522 Err(Error::WrongRoomState(_)) => None,
1523 Err(e) => {
1524 warn!("Error fetching invite details for room: {e:?}");
1525 None
1526 }
1527 };
1528
1529 Some(PreJoinRoomInfo { inviter })
1530 }
1531
1532 /// Finish joining a room.
1533 ///
1534 /// If the room was an invite that should be marked as a DM, will include it
1535 /// in the DM event after creating the joined room.
1536 ///
1537 /// If encrypted history sharing is enabled, will check to see if we have a
1538 /// key bundle, and import it if so.
1539 ///
1540 /// # Arguments
1541 ///
1542 /// * `room_id` - The `RoomId` of the room that was joined.
1543 /// * `pre_join_room_info` - Information about the room before we joined.
1544 async fn finish_join_room(
1545 &self,
1546 room_id: &RoomId,
1547 pre_join_room_info: Option<PreJoinRoomInfo>,
1548 ) -> Result<Room> {
1549 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1550 room.state() == RoomState::Invited
1551 && room.is_direct().await.unwrap_or_else(|e| {
1552 warn!(%room_id, "is_direct() failed: {e}");
1553 false
1554 })
1555 } else {
1556 false
1557 };
1558
1559 let base_room = self
1560 .base_client()
1561 .room_joined(
1562 room_id,
1563 pre_join_room_info
1564 .as_ref()
1565 .and_then(|info| info.inviter.as_ref())
1566 .map(|i| i.user_id().to_owned()),
1567 )
1568 .await?;
1569 let room = Room::new(self.clone(), base_room);
1570
1571 if mark_as_dm {
1572 room.set_is_direct(true).await?;
1573 }
1574
1575 #[cfg(feature = "e2e-encryption")]
1576 if self.inner.enable_share_history_on_invite
1577 && let Some(inviter) =
1578 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1579 {
1580 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1581 .await?;
1582 }
1583
1584 // Suppress "unused variable" and "unused field" lints
1585 #[cfg(not(feature = "e2e-encryption"))]
1586 let _ = pre_join_room_info.map(|i| i.inviter);
1587
1588 Ok(room)
1589 }
1590
1591 /// Join a room by `RoomId`.
1592 ///
1593 /// Returns the `Room` in the joined state.
1594 ///
1595 /// # Arguments
1596 ///
1597 /// * `room_id` - The `RoomId` of the room to be joined.
1598 #[instrument(skip(self))]
1599 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1600 // See who invited us to this room, if anyone. Note we have to do this before
1601 // making the `/join` request, otherwise we could race against the sync.
1602 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1603
1604 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1605 let response = self.send(request).await?;
1606 self.finish_join_room(&response.room_id, pre_join_info).await
1607 }
1608
1609 /// Join a room by `RoomOrAliasId`.
1610 ///
1611 /// Returns the `Room` in the joined state.
1612 ///
1613 /// # Arguments
1614 ///
1615 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1616 /// alias looks like `#name:example.com`.
1617 /// * `server_names` - The server names to be used for resolving the alias,
1618 /// if needs be.
1619 pub async fn join_room_by_id_or_alias(
1620 &self,
1621 alias: &RoomOrAliasId,
1622 server_names: &[OwnedServerName],
1623 ) -> Result<Room> {
1624 let pre_join_info = {
1625 match alias.try_into() {
1626 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1627 Err(_) => {
1628 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1629 // responding to an invitation to the room, and therefore don't need to handle
1630 // things that happen as a result of invites.
1631 None
1632 }
1633 }
1634 };
1635 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1636 via: server_names.to_owned(),
1637 });
1638 let response = self.send(request).await?;
1639 self.finish_join_room(&response.room_id, pre_join_info).await
1640 }
1641
1642 /// Search the homeserver's directory of public rooms.
1643 ///
1644 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1645 /// a `get_public_rooms::Response`.
1646 ///
1647 /// # Arguments
1648 ///
1649 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1650 ///
1651 /// * `since` - Pagination token from a previous request.
1652 ///
1653 /// * `server` - The name of the server, if `None` the requested server is
1654 /// used.
1655 ///
1656 /// # Examples
1657 /// ```no_run
1658 /// use matrix_sdk::Client;
1659 /// # use url::Url;
1660 /// # let homeserver = Url::parse("http://example.com").unwrap();
1661 /// # let limit = Some(10);
1662 /// # let since = Some("since token");
1663 /// # let server = Some("servername.com".try_into().unwrap());
1664 /// # async {
1665 /// let mut client = Client::new(homeserver).await.unwrap();
1666 ///
1667 /// client.public_rooms(limit, since, server).await;
1668 /// # };
1669 /// ```
1670 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1671 pub async fn public_rooms(
1672 &self,
1673 limit: Option<u32>,
1674 since: Option<&str>,
1675 server: Option<&ServerName>,
1676 ) -> HttpResult<get_public_rooms::v3::Response> {
1677 let limit = limit.map(UInt::from);
1678
1679 let request = assign!(get_public_rooms::v3::Request::new(), {
1680 limit,
1681 since: since.map(ToOwned::to_owned),
1682 server: server.map(ToOwned::to_owned),
1683 });
1684 self.send(request).await
1685 }
1686
1687 /// Create a room with the given parameters.
1688 ///
1689 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1690 /// created room.
1691 ///
1692 /// If you want to create a direct message with one specific user, you can
1693 /// use [`create_dm`][Self::create_dm], which is more convenient than
1694 /// assembling the [`create_room::v3::Request`] yourself.
1695 ///
1696 /// If the `is_direct` field of the request is set to `true` and at least
1697 /// one user is invited, the room will be automatically added to the direct
1698 /// rooms in the account data.
1699 ///
1700 /// # Examples
1701 ///
1702 /// ```no_run
1703 /// use matrix_sdk::{
1704 /// Client,
1705 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1706 /// };
1707 /// # use url::Url;
1708 /// #
1709 /// # async {
1710 /// # let homeserver = Url::parse("http://example.com").unwrap();
1711 /// let request = CreateRoomRequest::new();
1712 /// let client = Client::new(homeserver).await.unwrap();
1713 /// assert!(client.create_room(request).await.is_ok());
1714 /// # };
1715 /// ```
1716 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1717 let invite = request.invite.clone();
1718 let is_direct_room = request.is_direct;
1719 let response = self.send(request).await?;
1720
1721 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1722
1723 let joined_room = Room::new(self.clone(), base_room);
1724
1725 if is_direct_room
1726 && !invite.is_empty()
1727 && let Err(error) =
1728 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1729 {
1730 // FIXME: Retry in the background
1731 error!("Failed to mark room as DM: {error}");
1732 }
1733
1734 Ok(joined_room)
1735 }
1736
1737 /// Create a DM room.
1738 ///
1739 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1740 /// given user being invited, the room marked `is_direct` and both the
1741 /// creator and invitee getting the default maximum power level.
1742 ///
1743 /// If the `e2e-encryption` feature is enabled, the room will also be
1744 /// encrypted.
1745 ///
1746 /// # Arguments
1747 ///
1748 /// * `user_id` - The ID of the user to create a DM for.
1749 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1750 #[cfg(feature = "e2e-encryption")]
1751 let initial_state = vec![
1752 InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1753 .to_raw_any(),
1754 ];
1755
1756 #[cfg(not(feature = "e2e-encryption"))]
1757 let initial_state = vec![];
1758
1759 let request = assign!(create_room::v3::Request::new(), {
1760 invite: vec![user_id.to_owned()],
1761 is_direct: true,
1762 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1763 initial_state,
1764 });
1765
1766 self.create_room(request).await
1767 }
1768
1769 /// Search the homeserver's directory for public rooms with a filter.
1770 ///
1771 /// # Arguments
1772 ///
1773 /// * `room_search` - The easiest way to create this request is using the
1774 /// `get_public_rooms_filtered::Request` itself.
1775 ///
1776 /// # Examples
1777 ///
1778 /// ```no_run
1779 /// # use url::Url;
1780 /// # use matrix_sdk::Client;
1781 /// # async {
1782 /// # let homeserver = Url::parse("http://example.com")?;
1783 /// use matrix_sdk::ruma::{
1784 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1785 /// };
1786 /// # let mut client = Client::new(homeserver).await?;
1787 ///
1788 /// let mut filter = Filter::new();
1789 /// filter.generic_search_term = Some("rust".to_owned());
1790 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1791 /// request.filter = filter;
1792 ///
1793 /// let response = client.public_rooms_filtered(request).await?;
1794 ///
1795 /// for room in response.chunk {
1796 /// println!("Found room {room:?}");
1797 /// }
1798 /// # anyhow::Ok(()) };
1799 /// ```
1800 pub async fn public_rooms_filtered(
1801 &self,
1802 request: get_public_rooms_filtered::v3::Request,
1803 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1804 self.send(request).await
1805 }
1806
1807 /// Send an arbitrary request to the server, without updating client state.
1808 ///
1809 /// **Warning:** Because this method *does not* update the client state, it
1810 /// is important to make sure that you account for this yourself, and
1811 /// use wrapper methods where available. This method should *only* be
1812 /// used if a wrapper method for the endpoint you'd like to use is not
1813 /// available.
1814 ///
1815 /// # Arguments
1816 ///
1817 /// * `request` - A filled out and valid request for the endpoint to be hit
1818 ///
1819 /// * `timeout` - An optional request timeout setting, this overrides the
1820 /// default request setting if one was set.
1821 ///
1822 /// # Examples
1823 ///
1824 /// ```no_run
1825 /// # use matrix_sdk::{Client, config::SyncSettings};
1826 /// # use url::Url;
1827 /// # async {
1828 /// # let homeserver = Url::parse("http://localhost:8080")?;
1829 /// # let mut client = Client::new(homeserver).await?;
1830 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1831 ///
1832 /// // First construct the request you want to make
1833 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1834 /// // for all available Endpoints
1835 /// let user_id = user_id!("@example:localhost").to_owned();
1836 /// let request = profile::get_profile::v3::Request::new(user_id);
1837 ///
1838 /// // Start the request using Client::send()
1839 /// let response = client.send(request).await?;
1840 ///
1841 /// // Check the corresponding Response struct to find out what types are
1842 /// // returned
1843 /// # anyhow::Ok(()) };
1844 /// ```
1845 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1846 where
1847 Request: OutgoingRequest + Clone + Debug,
1848 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1849 {
1850 SendRequest {
1851 client: self.clone(),
1852 request,
1853 config: None,
1854 send_progress: Default::default(),
1855 }
1856 }
1857
1858 pub(crate) async fn send_inner<Request>(
1859 &self,
1860 request: Request,
1861 config: Option<RequestConfig>,
1862 send_progress: SharedObservable<TransmissionProgress>,
1863 ) -> HttpResult<Request::IncomingResponse>
1864 where
1865 Request: OutgoingRequest + Debug,
1866 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1867 {
1868 let homeserver = self.homeserver().to_string();
1869 let access_token = self.access_token();
1870
1871 self.inner
1872 .http_client
1873 .send(
1874 request,
1875 config,
1876 homeserver,
1877 access_token.as_deref(),
1878 &self.supported_versions().await?,
1879 send_progress,
1880 )
1881 .await
1882 }
1883
1884 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1885 _ = self
1886 .inner
1887 .auth_ctx
1888 .session_change_sender
1889 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1890 }
1891
1892 /// Fetches server versions from network; no caching.
1893 pub async fn fetch_server_versions(
1894 &self,
1895 request_config: Option<RequestConfig>,
1896 ) -> HttpResult<get_supported_versions::Response> {
1897 let server_versions = self
1898 .inner
1899 .http_client
1900 .send(
1901 get_supported_versions::Request::new(),
1902 request_config,
1903 self.homeserver().to_string(),
1904 None,
1905 &SupportedVersions {
1906 versions: [MatrixVersion::V1_0].into(),
1907 features: Default::default(),
1908 },
1909 Default::default(),
1910 )
1911 .await?;
1912
1913 Ok(server_versions)
1914 }
1915
1916 /// Fetches client well_known from network; no caching.
1917 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1918 let server_url_string = self
1919 .server()
1920 .unwrap_or(
1921 // Sometimes people configure their well-known directly on the homeserver so use
1922 // this as a fallback when the server name is unknown.
1923 &self.homeserver(),
1924 )
1925 .to_string();
1926
1927 let well_known = self
1928 .inner
1929 .http_client
1930 .send(
1931 discover_homeserver::Request::new(),
1932 Some(RequestConfig::short_retry()),
1933 server_url_string,
1934 None,
1935 &SupportedVersions {
1936 versions: [MatrixVersion::V1_0].into(),
1937 features: Default::default(),
1938 },
1939 Default::default(),
1940 )
1941 .await;
1942
1943 match well_known {
1944 Ok(well_known) => Some(well_known),
1945 Err(http_error) => {
1946 // It is perfectly valid to not have a well-known file.
1947 // Maybe we should check for a specific error code to be sure?
1948 warn!("Failed to fetch client well-known: {http_error}");
1949 None
1950 }
1951 }
1952 }
1953
1954 /// Load server info from storage, or fetch them from network and cache
1955 /// them.
1956 async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1957 match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1958 Ok(Some(stored)) => {
1959 if let Some(server_info) =
1960 stored.into_server_info().and_then(|info| info.maybe_decode())
1961 {
1962 return Ok(server_info);
1963 }
1964 }
1965 Ok(None) => {
1966 // fallthrough: cache is empty
1967 }
1968 Err(err) => {
1969 warn!("error when loading cached server info: {err}");
1970 // fallthrough to network.
1971 }
1972 }
1973
1974 let server_versions = self.fetch_server_versions(None).await?;
1975 let well_known = self.fetch_client_well_known().await;
1976 let server_info = ServerInfo::new(
1977 server_versions.versions.clone(),
1978 server_versions.unstable_features.clone(),
1979 well_known.map(Into::into),
1980 );
1981
1982 // Attempt to cache the result in storage.
1983 {
1984 if let Err(err) = self
1985 .state_store()
1986 .set_kv_data(
1987 StateStoreDataKey::ServerInfo,
1988 StateStoreDataValue::ServerInfo(server_info.clone()),
1989 )
1990 .await
1991 {
1992 warn!("error when caching server info: {err}");
1993 }
1994 }
1995
1996 Ok(server_info)
1997 }
1998
1999 async fn get_or_load_and_cache_server_info<
2000 Value,
2001 MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
2002 >(
2003 &self,
2004 map: MapFunction,
2005 ) -> HttpResult<Value> {
2006 let server_info = &self.inner.caches.server_info;
2007 if let CachedValue::Cached(val) = map(&*server_info.read().await) {
2008 return Ok(val);
2009 }
2010
2011 let mut guarded_server_info = server_info.write().await;
2012 if let CachedValue::Cached(val) = map(&guarded_server_info) {
2013 return Ok(val);
2014 }
2015
2016 let server_info = self.load_or_fetch_server_info().await?;
2017
2018 // Fill both unstable features and server versions at once.
2019 let mut supported = server_info.supported_versions();
2020 if supported.versions.is_empty() {
2021 supported.versions = [MatrixVersion::V1_0].into();
2022 }
2023
2024 guarded_server_info.supported_versions = CachedValue::Cached(supported);
2025 guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2026
2027 // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2028 // fetch an optional property), the function will always return some.
2029 Ok(map(&guarded_server_info).unwrap_cached_value())
2030 }
2031
2032 /// Get the Matrix versions and features supported by the homeserver by
2033 /// fetching them from the server or the cache.
2034 ///
2035 /// This is equivalent to calling both [`Client::server_versions()`] and
2036 /// [`Client::unstable_features()`]. To always fetch the result from the
2037 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2038 /// and then `.as_supported_versions()` on the response.
2039 ///
2040 /// # Examples
2041 ///
2042 /// ```no_run
2043 /// use ruma::api::{FeatureFlag, MatrixVersion};
2044 /// # use matrix_sdk::{Client, config::SyncSettings};
2045 /// # use url::Url;
2046 /// # async {
2047 /// # let homeserver = Url::parse("http://localhost:8080")?;
2048 /// # let mut client = Client::new(homeserver).await?;
2049 ///
2050 /// let supported = client.supported_versions().await?;
2051 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2052 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2053 ///
2054 /// let msc_x_feature = FeatureFlag::from("msc_x");
2055 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2056 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2057 /// # anyhow::Ok(()) };
2058 /// ```
2059 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2060 self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2061 .await
2062 }
2063
2064 /// Get the Matrix versions supported by the homeserver by fetching them
2065 /// from the server or the cache.
2066 ///
2067 /// # Examples
2068 ///
2069 /// ```no_run
2070 /// use ruma::api::MatrixVersion;
2071 /// # use matrix_sdk::{Client, config::SyncSettings};
2072 /// # use url::Url;
2073 /// # async {
2074 /// # let homeserver = Url::parse("http://localhost:8080")?;
2075 /// # let mut client = Client::new(homeserver).await?;
2076 ///
2077 /// let server_versions = client.server_versions().await?;
2078 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2079 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2080 /// # anyhow::Ok(()) };
2081 /// ```
2082 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2083 self.get_or_load_and_cache_server_info(|server_info| {
2084 server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2085 })
2086 .await
2087 }
2088
2089 /// Get the unstable features supported by the homeserver by fetching them
2090 /// from the server or the cache.
2091 ///
2092 /// # Examples
2093 ///
2094 /// ```no_run
2095 /// use matrix_sdk::ruma::api::FeatureFlag;
2096 /// # use matrix_sdk::{Client, config::SyncSettings};
2097 /// # use url::Url;
2098 /// # async {
2099 /// # let homeserver = Url::parse("http://localhost:8080")?;
2100 /// # let mut client = Client::new(homeserver).await?;
2101 ///
2102 /// let msc_x_feature = FeatureFlag::from("msc_x");
2103 /// let unstable_features = client.unstable_features().await?;
2104 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2105 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2106 /// # anyhow::Ok(()) };
2107 /// ```
2108 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2109 self.get_or_load_and_cache_server_info(|server_info| {
2110 server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2111 })
2112 .await
2113 }
2114
2115 /// Get information about the homeserver's advertised RTC foci by fetching
2116 /// the well-known file from the server or the cache.
2117 ///
2118 /// # Examples
2119 /// ```no_run
2120 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2121 /// # use url::Url;
2122 /// # async {
2123 /// # let homeserver = Url::parse("http://localhost:8080")?;
2124 /// # let mut client = Client::new(homeserver).await?;
2125 /// let rtc_foci = client.rtc_foci().await?;
2126 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2127 /// RtcFocusInfo::LiveKit(info) => Some(info),
2128 /// _ => None,
2129 /// });
2130 /// if let Some(info) = default_livekit_focus_info {
2131 /// println!("Default LiveKit service URL: {}", info.service_url);
2132 /// }
2133 /// # anyhow::Ok(()) };
2134 /// ```
2135 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2136 let well_known = self
2137 .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2138 .await?;
2139
2140 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2141 }
2142
2143 /// Empty the server version and unstable features cache.
2144 ///
2145 /// Since the SDK caches server info (versions, unstable features,
2146 /// well-known etc), it's possible to have a stale entry in the cache. This
2147 /// functions makes it possible to force reset it.
2148 pub async fn reset_server_info(&self) -> Result<()> {
2149 // Empty the in-memory caches.
2150 let mut guard = self.inner.caches.server_info.write().await;
2151 guard.supported_versions = CachedValue::NotSet;
2152
2153 // Empty the store cache.
2154 Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2155 }
2156
2157 /// Check whether MSC 4028 is enabled on the homeserver.
2158 ///
2159 /// # Examples
2160 ///
2161 /// ```no_run
2162 /// # use matrix_sdk::{Client, config::SyncSettings};
2163 /// # use url::Url;
2164 /// # async {
2165 /// # let homeserver = Url::parse("http://localhost:8080")?;
2166 /// # let mut client = Client::new(homeserver).await?;
2167 /// let msc4028_enabled =
2168 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2169 /// # anyhow::Ok(()) };
2170 /// ```
2171 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2172 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2173 }
2174
2175 /// Get information of all our own devices.
2176 ///
2177 /// # Examples
2178 ///
2179 /// ```no_run
2180 /// # use matrix_sdk::{Client, config::SyncSettings};
2181 /// # use url::Url;
2182 /// # async {
2183 /// # let homeserver = Url::parse("http://localhost:8080")?;
2184 /// # let mut client = Client::new(homeserver).await?;
2185 /// let response = client.devices().await?;
2186 ///
2187 /// for device in response.devices {
2188 /// println!(
2189 /// "Device: {} {}",
2190 /// device.device_id,
2191 /// device.display_name.as_deref().unwrap_or("")
2192 /// );
2193 /// }
2194 /// # anyhow::Ok(()) };
2195 /// ```
2196 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2197 let request = get_devices::v3::Request::new();
2198
2199 self.send(request).await
2200 }
2201
2202 /// Delete the given devices from the server.
2203 ///
2204 /// # Arguments
2205 ///
2206 /// * `devices` - The list of devices that should be deleted from the
2207 /// server.
2208 ///
2209 /// * `auth_data` - This request requires user interactive auth, the first
2210 /// request needs to set this to `None` and will always fail with an
2211 /// `UiaaResponse`. The response will contain information for the
2212 /// interactive auth and the same request needs to be made but this time
2213 /// with some `auth_data` provided.
2214 ///
2215 /// ```no_run
2216 /// # use matrix_sdk::{
2217 /// # ruma::{api::client::uiaa, device_id},
2218 /// # Client, Error, config::SyncSettings,
2219 /// # };
2220 /// # use serde_json::json;
2221 /// # use url::Url;
2222 /// # use std::collections::BTreeMap;
2223 /// # async {
2224 /// # let homeserver = Url::parse("http://localhost:8080")?;
2225 /// # let mut client = Client::new(homeserver).await?;
2226 /// let devices = &[device_id!("DEVICEID").to_owned()];
2227 ///
2228 /// if let Err(e) = client.delete_devices(devices, None).await {
2229 /// if let Some(info) = e.as_uiaa_response() {
2230 /// let mut password = uiaa::Password::new(
2231 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2232 /// "wordpass".to_owned(),
2233 /// );
2234 /// password.session = info.session.clone();
2235 ///
2236 /// client
2237 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2238 /// .await?;
2239 /// }
2240 /// }
2241 /// # anyhow::Ok(()) };
2242 pub async fn delete_devices(
2243 &self,
2244 devices: &[OwnedDeviceId],
2245 auth_data: Option<uiaa::AuthData>,
2246 ) -> HttpResult<delete_devices::v3::Response> {
2247 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2248 request.auth = auth_data;
2249
2250 self.send(request).await
2251 }
2252
2253 /// Change the display name of a device owned by the current user.
2254 ///
2255 /// Returns a `update_device::Response` which specifies the result
2256 /// of the operation.
2257 ///
2258 /// # Arguments
2259 ///
2260 /// * `device_id` - The ID of the device to change the display name of.
2261 /// * `display_name` - The new display name to set.
2262 pub async fn rename_device(
2263 &self,
2264 device_id: &DeviceId,
2265 display_name: &str,
2266 ) -> HttpResult<update_device::v3::Response> {
2267 let mut request = update_device::v3::Request::new(device_id.to_owned());
2268 request.display_name = Some(display_name.to_owned());
2269
2270 self.send(request).await
2271 }
2272
2273 /// Synchronize the client's state with the latest state on the server.
2274 ///
2275 /// ## Syncing Events
2276 ///
2277 /// Messages or any other type of event need to be periodically fetched from
2278 /// the server, this is achieved by sending a `/sync` request to the server.
2279 ///
2280 /// The first sync is sent out without a [`token`]. The response of the
2281 /// first sync will contain a [`next_batch`] field which should then be
2282 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2283 /// don't receive the same events multiple times.
2284 ///
2285 /// ## Long Polling
2286 ///
2287 /// A sync should in the usual case always be in flight. The
2288 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2289 /// long the server will wait for new events before it will respond.
2290 /// The server will respond immediately if some new events arrive before the
2291 /// timeout has expired. If no changes arrive and the timeout expires an
2292 /// empty sync response will be sent to the client.
2293 ///
2294 /// This method of sending a request that may not receive a response
2295 /// immediately is called long polling.
2296 ///
2297 /// ## Filtering Events
2298 ///
2299 /// The number or type of messages and events that the client should receive
2300 /// from the server can be altered using a [`Filter`].
2301 ///
2302 /// Filters can be non-trivial and, since they will be sent with every sync
2303 /// request, they may take up a bunch of unnecessary bandwidth.
2304 ///
2305 /// Luckily filters can be uploaded to the server and reused using an unique
2306 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2307 /// method.
2308 ///
2309 /// # Arguments
2310 ///
2311 /// * `sync_settings` - Settings for the sync call, this allows us to set
2312 /// various options to configure the sync:
2313 /// * [`filter`] - To configure which events we receive and which get
2314 /// [filtered] by the server
2315 /// * [`timeout`] - To configure our [long polling] setup.
2316 /// * [`token`] - To tell the server which events we already received
2317 /// and where we wish to continue syncing.
2318 /// * [`full_state`] - To tell the server that we wish to receive all
2319 /// state events, regardless of our configured [`token`].
2320 /// * [`set_presence`] - To tell the server to set the presence and to
2321 /// which state.
2322 ///
2323 /// # Examples
2324 ///
2325 /// ```no_run
2326 /// # use url::Url;
2327 /// # async {
2328 /// # let homeserver = Url::parse("http://localhost:8080")?;
2329 /// # let username = "";
2330 /// # let password = "";
2331 /// use matrix_sdk::{
2332 /// Client, config::SyncSettings,
2333 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2334 /// };
2335 ///
2336 /// let client = Client::new(homeserver).await?;
2337 /// client.matrix_auth().login_username(username, password).send().await?;
2338 ///
2339 /// // Sync once so we receive the client state and old messages.
2340 /// client.sync_once(SyncSettings::default()).await?;
2341 ///
2342 /// // Register our handler so we start responding once we receive a new
2343 /// // event.
2344 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2345 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2346 /// });
2347 ///
2348 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2349 /// // from our `sync_once()` call automatically.
2350 /// client.sync(SyncSettings::default()).await;
2351 /// # anyhow::Ok(()) };
2352 /// ```
2353 ///
2354 /// [`sync`]: #method.sync
2355 /// [`SyncSettings`]: crate::config::SyncSettings
2356 /// [`token`]: crate::config::SyncSettings#method.token
2357 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2358 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2359 /// [`set_presence`]: ruma::presence::PresenceState
2360 /// [`filter`]: crate::config::SyncSettings#method.filter
2361 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2362 /// [`next_batch`]: SyncResponse#structfield.next_batch
2363 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2364 /// [long polling]: #long-polling
2365 /// [filtered]: #filtering-events
2366 #[instrument(skip(self))]
2367 pub async fn sync_once(
2368 &self,
2369 sync_settings: crate::config::SyncSettings,
2370 ) -> Result<SyncResponse> {
2371 // The sync might not return for quite a while due to the timeout.
2372 // We'll see if there's anything crypto related to send out before we
2373 // sync, i.e. if we closed our client after a sync but before the
2374 // crypto requests were sent out.
2375 //
2376 // This will mostly be a no-op.
2377 #[cfg(feature = "e2e-encryption")]
2378 if let Err(e) = self.send_outgoing_requests().await {
2379 error!(error = ?e, "Error while sending outgoing E2EE requests");
2380 }
2381
2382 let token = match sync_settings.token {
2383 SyncToken::Specific(token) => Some(token),
2384 SyncToken::NoToken => None,
2385 SyncToken::ReusePrevious => self.sync_token().await,
2386 };
2387
2388 let request = assign!(sync_events::v3::Request::new(), {
2389 filter: sync_settings.filter.map(|f| *f),
2390 since: token,
2391 full_state: sync_settings.full_state,
2392 set_presence: sync_settings.set_presence,
2393 timeout: sync_settings.timeout,
2394 use_state_after: true,
2395 });
2396 let mut request_config = self.request_config();
2397 if let Some(timeout) = sync_settings.timeout {
2398 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2399 request_config.timeout = Some(base_timeout + timeout);
2400 }
2401
2402 let response = self.send(request).with_request_config(request_config).await?;
2403 let next_batch = response.next_batch.clone();
2404 let response = self.process_sync(response).await?;
2405
2406 #[cfg(feature = "e2e-encryption")]
2407 if let Err(e) = self.send_outgoing_requests().await {
2408 error!(error = ?e, "Error while sending outgoing E2EE requests");
2409 }
2410
2411 self.inner.sync_beat.notify(usize::MAX);
2412
2413 Ok(SyncResponse::new(next_batch, response))
2414 }
2415
2416 /// Repeatedly synchronize the client state with the server.
2417 ///
2418 /// This method will only return on error, if cancellation is needed
2419 /// the method should be wrapped in a cancelable task or the
2420 /// [`Client::sync_with_callback`] method can be used or
2421 /// [`Client::sync_with_result_callback`] if you want to handle error
2422 /// cases in the loop, too.
2423 ///
2424 /// This method will internally call [`Client::sync_once`] in a loop.
2425 ///
2426 /// This method can be used with the [`Client::add_event_handler`]
2427 /// method to react to individual events. If you instead wish to handle
2428 /// events in a bulk manner the [`Client::sync_with_callback`],
2429 /// [`Client::sync_with_result_callback`] and
2430 /// [`Client::sync_stream`] methods can be used instead. Those methods
2431 /// repeatedly return the whole sync response.
2432 ///
2433 /// # Arguments
2434 ///
2435 /// * `sync_settings` - Settings for the sync call. *Note* that those
2436 /// settings will be only used for the first sync call. See the argument
2437 /// docs for [`Client::sync_once`] for more info.
2438 ///
2439 /// # Return
2440 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2441 /// up to the user of the API to check the error and decide whether the sync
2442 /// should continue or not.
2443 ///
2444 /// # Examples
2445 ///
2446 /// ```no_run
2447 /// # use url::Url;
2448 /// # async {
2449 /// # let homeserver = Url::parse("http://localhost:8080")?;
2450 /// # let username = "";
2451 /// # let password = "";
2452 /// use matrix_sdk::{
2453 /// Client, config::SyncSettings,
2454 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2455 /// };
2456 ///
2457 /// let client = Client::new(homeserver).await?;
2458 /// client.matrix_auth().login_username(&username, &password).send().await?;
2459 ///
2460 /// // Register our handler so we start responding once we receive a new
2461 /// // event.
2462 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2463 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2464 /// });
2465 ///
2466 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2467 /// // automatically.
2468 /// client.sync(SyncSettings::default()).await?;
2469 /// # anyhow::Ok(()) };
2470 /// ```
2471 ///
2472 /// [argument docs]: #method.sync_once
2473 /// [`sync_with_callback`]: #method.sync_with_callback
2474 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2475 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2476 }
2477
2478 /// Repeatedly call sync to synchronize the client state with the server.
2479 ///
2480 /// # Arguments
2481 ///
2482 /// * `sync_settings` - Settings for the sync call. *Note* that those
2483 /// settings will be only used for the first sync call. See the argument
2484 /// docs for [`Client::sync_once`] for more info.
2485 ///
2486 /// * `callback` - A callback that will be called every time a successful
2487 /// response has been fetched from the server. The callback must return a
2488 /// boolean which signalizes if the method should stop syncing. If the
2489 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2490 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2491 ///
2492 /// # Return
2493 /// The sync runs until an error occurs or the
2494 /// callback indicates that the Loop should stop. If the callback asked for
2495 /// a regular stop, the result will be `Ok(())` otherwise the
2496 /// `Err(Error)` is returned.
2497 ///
2498 /// # Examples
2499 ///
2500 /// The following example demonstrates how to sync forever while sending all
2501 /// the interesting events through a mpsc channel to another thread e.g. a
2502 /// UI thread.
2503 ///
2504 /// ```no_run
2505 /// # use std::time::Duration;
2506 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2507 /// # use url::Url;
2508 /// # async {
2509 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2510 /// # let mut client = Client::new(homeserver).await.unwrap();
2511 ///
2512 /// use tokio::sync::mpsc::channel;
2513 ///
2514 /// let (tx, rx) = channel(100);
2515 ///
2516 /// let sync_channel = &tx;
2517 /// let sync_settings = SyncSettings::new()
2518 /// .timeout(Duration::from_secs(30));
2519 ///
2520 /// client
2521 /// .sync_with_callback(sync_settings, |response| async move {
2522 /// let channel = sync_channel;
2523 /// for (room_id, room) in response.rooms.joined {
2524 /// for event in room.timeline.events {
2525 /// channel.send(event).await.unwrap();
2526 /// }
2527 /// }
2528 ///
2529 /// LoopCtrl::Continue
2530 /// })
2531 /// .await;
2532 /// };
2533 /// ```
2534 #[instrument(skip_all)]
2535 pub async fn sync_with_callback<C>(
2536 &self,
2537 sync_settings: crate::config::SyncSettings,
2538 callback: impl Fn(SyncResponse) -> C,
2539 ) -> Result<(), Error>
2540 where
2541 C: Future<Output = LoopCtrl>,
2542 {
2543 self.sync_with_result_callback(sync_settings, |result| async {
2544 Ok(callback(result?).await)
2545 })
2546 .await
2547 }
2548
2549 /// Repeatedly call sync to synchronize the client state with the server.
2550 ///
2551 /// # Arguments
2552 ///
2553 /// * `sync_settings` - Settings for the sync call. *Note* that those
2554 /// settings will be only used for the first sync call. See the argument
2555 /// docs for [`Client::sync_once`] for more info.
2556 ///
2557 /// * `callback` - A callback that will be called every time after a
2558 /// response has been received, failure or not. The callback returns a
2559 /// `Result<LoopCtrl, Error>`, too. When returning
2560 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2561 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2562 /// function returns `Ok(())`. In case the callback can't handle the
2563 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2564 /// which results in the sync ending and the `Err(Error)` being returned.
2565 ///
2566 /// # Return
2567 /// The sync runs until an error occurs that the callback can't handle or
2568 /// the callback indicates that the Loop should stop. If the callback
2569 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2570 /// `Err(Error)` is returned.
2571 ///
2572 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2573 /// this, and are handled first without sending the result to the
2574 /// callback. Only after they have exceeded is the `Result` handed to
2575 /// the callback.
2576 ///
2577 /// # Examples
2578 ///
2579 /// The following example demonstrates how to sync forever while sending all
2580 /// the interesting events through a mpsc channel to another thread e.g. a
2581 /// UI thread.
2582 ///
2583 /// ```no_run
2584 /// # use std::time::Duration;
2585 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2586 /// # use url::Url;
2587 /// # async {
2588 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2589 /// # let mut client = Client::new(homeserver).await.unwrap();
2590 /// #
2591 /// use tokio::sync::mpsc::channel;
2592 ///
2593 /// let (tx, rx) = channel(100);
2594 ///
2595 /// let sync_channel = &tx;
2596 /// let sync_settings = SyncSettings::new()
2597 /// .timeout(Duration::from_secs(30));
2598 ///
2599 /// client
2600 /// .sync_with_result_callback(sync_settings, |response| async move {
2601 /// let channel = sync_channel;
2602 /// let sync_response = response?;
2603 /// for (room_id, room) in sync_response.rooms.joined {
2604 /// for event in room.timeline.events {
2605 /// channel.send(event).await.unwrap();
2606 /// }
2607 /// }
2608 ///
2609 /// Ok(LoopCtrl::Continue)
2610 /// })
2611 /// .await;
2612 /// };
2613 /// ```
2614 #[instrument(skip(self, callback))]
2615 pub async fn sync_with_result_callback<C>(
2616 &self,
2617 sync_settings: crate::config::SyncSettings,
2618 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2619 ) -> Result<(), Error>
2620 where
2621 C: Future<Output = Result<LoopCtrl, Error>>,
2622 {
2623 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2624
2625 while let Some(result) = sync_stream.next().await {
2626 trace!("Running callback");
2627 if callback(result).await? == LoopCtrl::Break {
2628 trace!("Callback told us to stop");
2629 break;
2630 }
2631 trace!("Done running callback");
2632 }
2633
2634 Ok(())
2635 }
2636
2637 //// Repeatedly synchronize the client state with the server.
2638 ///
2639 /// This method will internally call [`Client::sync_once`] in a loop and is
2640 /// equivalent to the [`Client::sync`] method but the responses are provided
2641 /// as an async stream.
2642 ///
2643 /// # Arguments
2644 ///
2645 /// * `sync_settings` - Settings for the sync call. *Note* that those
2646 /// settings will be only used for the first sync call. See the argument
2647 /// docs for [`Client::sync_once`] for more info.
2648 ///
2649 /// # Examples
2650 ///
2651 /// ```no_run
2652 /// # use url::Url;
2653 /// # async {
2654 /// # let homeserver = Url::parse("http://localhost:8080")?;
2655 /// # let username = "";
2656 /// # let password = "";
2657 /// use futures_util::StreamExt;
2658 /// use matrix_sdk::{Client, config::SyncSettings};
2659 ///
2660 /// let client = Client::new(homeserver).await?;
2661 /// client.matrix_auth().login_username(&username, &password).send().await?;
2662 ///
2663 /// let mut sync_stream =
2664 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2665 ///
2666 /// while let Some(Ok(response)) = sync_stream.next().await {
2667 /// for room in response.rooms.joined.values() {
2668 /// for e in &room.timeline.events {
2669 /// if let Ok(event) = e.raw().deserialize() {
2670 /// println!("Received event {:?}", event);
2671 /// }
2672 /// }
2673 /// }
2674 /// }
2675 ///
2676 /// # anyhow::Ok(()) };
2677 /// ```
2678 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2679 #[instrument(skip(self))]
2680 pub async fn sync_stream(
2681 &self,
2682 mut sync_settings: crate::config::SyncSettings,
2683 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2684 let mut is_first_sync = true;
2685 let mut timeout = None;
2686 let mut last_sync_time: Option<Instant> = None;
2687
2688 let parent_span = Span::current();
2689
2690 async_stream::stream!({
2691 loop {
2692 trace!("Syncing");
2693
2694 if sync_settings.ignore_timeout_on_first_sync {
2695 if is_first_sync {
2696 timeout = sync_settings.timeout.take();
2697 } else if sync_settings.timeout.is_none() && timeout.is_some() {
2698 sync_settings.timeout = timeout.take();
2699 }
2700
2701 is_first_sync = false;
2702 }
2703
2704 yield self
2705 .sync_loop_helper(&mut sync_settings)
2706 .instrument(parent_span.clone())
2707 .await;
2708
2709 Client::delay_sync(&mut last_sync_time).await
2710 }
2711 })
2712 }
2713
2714 /// Get the current, if any, sync token of the client.
2715 /// This will be None if the client didn't sync at least once.
2716 pub(crate) async fn sync_token(&self) -> Option<String> {
2717 self.inner.base_client.sync_token().await
2718 }
2719
2720 /// Gets information about the owner of a given access token.
2721 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2722 let request = whoami::v3::Request::new();
2723 self.send(request).await
2724 }
2725
2726 /// Subscribes a new receiver to client SessionChange broadcasts.
2727 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2728 let broadcast = &self.auth_ctx().session_change_sender;
2729 broadcast.subscribe()
2730 }
2731
2732 /// Sets the save/restore session callbacks.
2733 ///
2734 /// This is another mechanism to get synchronous updates to session tokens,
2735 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2736 pub fn set_session_callbacks(
2737 &self,
2738 reload_session_callback: Box<ReloadSessionCallback>,
2739 save_session_callback: Box<SaveSessionCallback>,
2740 ) -> Result<()> {
2741 self.inner
2742 .auth_ctx
2743 .reload_session_callback
2744 .set(reload_session_callback)
2745 .map_err(|_| Error::MultipleSessionCallbacks)?;
2746
2747 self.inner
2748 .auth_ctx
2749 .save_session_callback
2750 .set(save_session_callback)
2751 .map_err(|_| Error::MultipleSessionCallbacks)?;
2752
2753 Ok(())
2754 }
2755
2756 /// Get the notification settings of the current owner of the client.
2757 pub async fn notification_settings(&self) -> NotificationSettings {
2758 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2759 NotificationSettings::new(self.clone(), ruleset)
2760 }
2761
2762 /// Create a new specialized `Client` that can process notifications.
2763 ///
2764 /// See [`CrossProcessLock::new`] to learn more about
2765 /// `cross_process_store_locks_holder_name`.
2766 ///
2767 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
2768 pub async fn notification_client(
2769 &self,
2770 cross_process_store_locks_holder_name: String,
2771 ) -> Result<Client> {
2772 let client = Client {
2773 inner: ClientInner::new(
2774 self.inner.auth_ctx.clone(),
2775 self.server().cloned(),
2776 self.homeserver(),
2777 self.sliding_sync_version(),
2778 self.inner.http_client.clone(),
2779 self.inner
2780 .base_client
2781 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2782 .await?,
2783 self.inner.caches.server_info.read().await.clone(),
2784 self.inner.respect_login_well_known,
2785 self.inner.event_cache.clone(),
2786 self.inner.send_queue_data.clone(),
2787 self.inner.latest_events.clone(),
2788 #[cfg(feature = "e2e-encryption")]
2789 self.inner.e2ee.encryption_settings,
2790 #[cfg(feature = "e2e-encryption")]
2791 self.inner.enable_share_history_on_invite,
2792 cross_process_store_locks_holder_name,
2793 #[cfg(feature = "experimental-search")]
2794 self.inner.search_index.clone(),
2795 self.inner.thread_subscription_catchup.clone(),
2796 )
2797 .await,
2798 };
2799
2800 Ok(client)
2801 }
2802
2803 /// The [`EventCache`] instance for this [`Client`].
2804 pub fn event_cache(&self) -> &EventCache {
2805 // SAFETY: always initialized in the `Client` ctor.
2806 self.inner.event_cache.get().unwrap()
2807 }
2808
2809 /// The [`LatestEvents`] instance for this [`Client`].
2810 pub async fn latest_events(&self) -> &LatestEvents {
2811 self.inner
2812 .latest_events
2813 .get_or_init(|| async {
2814 LatestEvents::new(
2815 WeakClient::from_client(self),
2816 self.event_cache().clone(),
2817 SendQueue::new(self.clone()),
2818 )
2819 })
2820 .await
2821 }
2822
2823 /// Waits until an at least partially synced room is received, and returns
2824 /// it.
2825 ///
2826 /// **Note: this function will loop endlessly until either it finds the room
2827 /// or an externally set timeout happens.**
2828 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2829 loop {
2830 if let Some(room) = self.get_room(room_id) {
2831 if room.is_state_partially_or_fully_synced() {
2832 debug!("Found just created room!");
2833 return room;
2834 }
2835 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2836 } else {
2837 debug!("Room wasn't found, waiting for sync beat to try again");
2838 }
2839 self.inner.sync_beat.listen().await;
2840 }
2841 }
2842
2843 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2844 /// join it.
2845 pub async fn knock(
2846 &self,
2847 room_id_or_alias: OwnedRoomOrAliasId,
2848 reason: Option<String>,
2849 server_names: Vec<OwnedServerName>,
2850 ) -> Result<Room> {
2851 let request =
2852 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2853 let response = self.send(request).await?;
2854 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2855 Ok(Room::new(self.clone(), base_room))
2856 }
2857
2858 /// Checks whether the provided `user_id` belongs to an ignored user.
2859 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2860 self.base_client().is_user_ignored(user_id).await
2861 }
2862
2863 /// Gets the `max_upload_size` value from the homeserver, getting either a
2864 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2865 /// missing.
2866 ///
2867 /// Check the spec for more info:
2868 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2869 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2870 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2871 if let Some(data) = max_upload_size_lock.get() {
2872 return Ok(data.to_owned());
2873 }
2874
2875 // Use the authenticated endpoint when the server supports it.
2876 let supported_versions = self.supported_versions().await?;
2877 let use_auth =
2878 authenticated_media::get_media_config::v1::Request::is_supported(&supported_versions);
2879
2880 let upload_size = if use_auth {
2881 self.send(authenticated_media::get_media_config::v1::Request::default())
2882 .await?
2883 .upload_size
2884 } else {
2885 #[allow(deprecated)]
2886 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
2887 };
2888
2889 match max_upload_size_lock.set(upload_size) {
2890 Ok(_) => Ok(upload_size),
2891 Err(error) => {
2892 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2893 }
2894 }
2895 }
2896
2897 /// The settings to use for decrypting events.
2898 #[cfg(feature = "e2e-encryption")]
2899 pub fn decryption_settings(&self) -> &DecryptionSettings {
2900 &self.base_client().decryption_settings
2901 }
2902
2903 /// Returns the [`SearchIndex`] for this [`Client`].
2904 #[cfg(feature = "experimental-search")]
2905 pub fn search_index(&self) -> &SearchIndex {
2906 &self.inner.search_index
2907 }
2908
2909 /// Whether the client is configured to take thread subscriptions (MSC4306
2910 /// and MSC4308) into account.
2911 ///
2912 /// This may cause filtering out of thread subscriptions, and loading the
2913 /// thread subscriptions via the sliding sync extension, when the room
2914 /// list service is being used.
2915 pub fn enabled_thread_subscriptions(&self) -> bool {
2916 match self.base_client().threading_support {
2917 ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
2918 ThreadingSupport::Disabled => false,
2919 }
2920 }
2921
2922 /// Fetch thread subscriptions changes between `from` and up to `to`.
2923 ///
2924 /// The `limit` optional parameter can be used to limit the number of
2925 /// entries in a response. It can also be overridden by the server, if
2926 /// it's deemed too large.
2927 pub async fn fetch_thread_subscriptions(
2928 &self,
2929 from: Option<String>,
2930 to: Option<String>,
2931 limit: Option<UInt>,
2932 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
2933 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
2934 from,
2935 to,
2936 limit,
2937 });
2938 Ok(self.send(request).await?)
2939 }
2940
2941 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
2942 self.inner.thread_subscription_catchup.get().unwrap()
2943 }
2944}
2945
2946#[cfg(any(feature = "testing", test))]
2947impl Client {
2948 /// Test helper to mark users as tracked by the crypto layer.
2949 #[cfg(feature = "e2e-encryption")]
2950 pub async fn update_tracked_users_for_testing(
2951 &self,
2952 user_ids: impl IntoIterator<Item = &UserId>,
2953 ) {
2954 let olm = self.olm_machine().await;
2955 let olm = olm.as_ref().unwrap();
2956 olm.update_tracked_users(user_ids).await.unwrap();
2957 }
2958}
2959
2960/// A weak reference to the inner client, useful when trying to get a handle
2961/// on the owning client.
2962#[derive(Clone, Debug)]
2963pub(crate) struct WeakClient {
2964 client: Weak<ClientInner>,
2965}
2966
2967impl WeakClient {
2968 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2969 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2970 Self { client: Arc::downgrade(client) }
2971 }
2972
2973 /// Construct a [`WeakClient`] from a [`Client`].
2974 pub fn from_client(client: &Client) -> Self {
2975 Self::from_inner(&client.inner)
2976 }
2977
2978 /// Attempts to get a [`Client`] from this [`WeakClient`].
2979 pub fn get(&self) -> Option<Client> {
2980 self.client.upgrade().map(|inner| Client { inner })
2981 }
2982
2983 /// Gets the number of strong (`Arc`) pointers still pointing to this
2984 /// client.
2985 #[allow(dead_code)]
2986 pub fn strong_count(&self) -> usize {
2987 self.client.strong_count()
2988 }
2989}
2990
2991#[derive(Clone)]
2992struct ClientServerInfo {
2993 /// The Matrix versions and unstable features the server supports (known
2994 /// ones only).
2995 supported_versions: CachedValue<SupportedVersions>,
2996
2997 /// The server's well-known file, if any.
2998 well_known: CachedValue<Option<WellKnownResponse>>,
2999}
3000
3001/// A cached value that can either be set or not set, used to avoid confusion
3002/// between a value that is set to `None` (because it doesn't exist) and a value
3003/// that has not been cached yet.
3004#[derive(Clone)]
3005enum CachedValue<Value> {
3006 /// A value has been cached.
3007 Cached(Value),
3008 /// Nothing has been cached yet.
3009 NotSet,
3010}
3011
3012impl<Value> CachedValue<Value> {
3013 /// Unwraps the cached value, returning it if it exists.
3014 ///
3015 /// # Panics
3016 ///
3017 /// If the cached value is not set, this will panic.
3018 fn unwrap_cached_value(self) -> Value {
3019 match self {
3020 CachedValue::Cached(value) => value,
3021 CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
3022 }
3023 }
3024
3025 /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
3026 fn as_ref(&self) -> CachedValue<&Value> {
3027 match self {
3028 Self::Cached(value) => CachedValue::Cached(value),
3029 Self::NotSet => CachedValue::NotSet,
3030 }
3031 }
3032
3033 /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3034 /// function to a contained value (if `Cached`) or returns `NotSet` (if
3035 /// `NotSet`).
3036 fn map<Other, F>(self, f: F) -> CachedValue<Other>
3037 where
3038 F: FnOnce(Value) -> Other,
3039 {
3040 match self {
3041 Self::Cached(value) => CachedValue::Cached(f(value)),
3042 Self::NotSet => CachedValue::NotSet,
3043 }
3044 }
3045}
3046
3047/// Information about the state of a room before we joined it.
3048#[derive(Debug, Clone, Default)]
3049struct PreJoinRoomInfo {
3050 /// The user who invited us to the room, if any.
3051 pub inviter: Option<RoomMember>,
3052}
3053
3054// The http mocking library is not supported for wasm32
3055#[cfg(all(test, not(target_family = "wasm")))]
3056pub(crate) mod tests {
3057 use std::{sync::Arc, time::Duration};
3058
3059 use assert_matches::assert_matches;
3060 use assert_matches2::assert_let;
3061 use eyeball::SharedObservable;
3062 use futures_util::{FutureExt, StreamExt, pin_mut};
3063 use js_int::{UInt, uint};
3064 use matrix_sdk_base::{
3065 RoomState,
3066 store::{MemoryStore, StoreConfig},
3067 };
3068 use matrix_sdk_test::{
3069 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3070 event_factory::EventFactory,
3071 };
3072 #[cfg(target_family = "wasm")]
3073 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3074
3075 use ruma::{
3076 RoomId, ServerName, UserId,
3077 api::{
3078 FeatureFlag, MatrixVersion,
3079 client::{
3080 discovery::discover_homeserver::RtcFocusInfo,
3081 room::create_room::v3::Request as CreateRoomRequest,
3082 },
3083 },
3084 assign,
3085 events::{
3086 ignored_user_list::IgnoredUserListEventContent,
3087 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3088 },
3089 owned_room_id, owned_user_id, room_alias_id, room_id,
3090 };
3091 use serde_json::json;
3092 use stream_assert::{assert_next_matches, assert_pending};
3093 use tokio::{
3094 spawn,
3095 time::{sleep, timeout},
3096 };
3097 use url::Url;
3098
3099 use super::Client;
3100 use crate::{
3101 Error, TransmissionProgress,
3102 client::{WeakClient, futures::SendMediaUploadRequest},
3103 config::{RequestConfig, SyncSettings},
3104 futures::SendRequest,
3105 media::MediaError,
3106 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3107 };
3108
3109 #[async_test]
3110 async fn test_account_data() {
3111 let server = MatrixMockServer::new().await;
3112 let client = server.client_builder().build().await;
3113
3114 let f = EventFactory::new();
3115 server
3116 .mock_sync()
3117 .ok_and_run(&client, |builder| {
3118 builder.add_global_account_data(
3119 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3120 );
3121 })
3122 .await;
3123
3124 let content = client
3125 .account()
3126 .account_data::<IgnoredUserListEventContent>()
3127 .await
3128 .unwrap()
3129 .unwrap()
3130 .deserialize()
3131 .unwrap();
3132
3133 assert_eq!(content.ignored_users.len(), 1);
3134 }
3135
3136 #[async_test]
3137 async fn test_successful_discovery() {
3138 // Imagine this is `matrix.org`.
3139 let server = MatrixMockServer::new().await;
3140 let server_url = server.uri();
3141
3142 // Imagine this is `matrix-client.matrix.org`.
3143 let homeserver = MatrixMockServer::new().await;
3144 let homeserver_url = homeserver.uri();
3145
3146 // Imagine Alice has the user ID `@alice:matrix.org`.
3147 let domain = server_url.strip_prefix("http://").unwrap();
3148 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3149
3150 // The `.well-known` is on the server (e.g. `matrix.org`).
3151 server
3152 .mock_well_known()
3153 .ok_with_homeserver_url(&homeserver_url)
3154 .mock_once()
3155 .named("well-known")
3156 .mount()
3157 .await;
3158
3159 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3160 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3161
3162 let client = Client::builder()
3163 .insecure_server_name_no_tls(alice.server_name())
3164 .build()
3165 .await
3166 .unwrap();
3167
3168 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3169 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3170 client.server_versions().await.unwrap();
3171 }
3172
3173 #[async_test]
3174 async fn test_discovery_broken_server() {
3175 let server = MatrixMockServer::new().await;
3176 let server_url = server.uri();
3177 let domain = server_url.strip_prefix("http://").unwrap();
3178 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3179
3180 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3181
3182 assert!(
3183 Client::builder()
3184 .insecure_server_name_no_tls(alice.server_name())
3185 .build()
3186 .await
3187 .is_err(),
3188 "Creating a client from a user ID should fail when the .well-known request fails."
3189 );
3190 }
3191
3192 #[async_test]
3193 async fn test_room_creation() {
3194 let server = MatrixMockServer::new().await;
3195 let client = server.client_builder().build().await;
3196
3197 server
3198 .mock_sync()
3199 .ok_and_run(&client, |builder| {
3200 builder.add_joined_room(
3201 JoinedRoomBuilder::default()
3202 .add_state_event(StateTestEvent::Member)
3203 .add_state_event(StateTestEvent::PowerLevels),
3204 );
3205 })
3206 .await;
3207
3208 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3209 assert_eq!(room.state(), RoomState::Joined);
3210 }
3211
3212 #[async_test]
3213 async fn test_retry_limit_http_requests() {
3214 let server = MatrixMockServer::new().await;
3215 let client = server
3216 .client_builder()
3217 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3218 .build()
3219 .await;
3220
3221 assert!(client.request_config().retry_limit.unwrap() == 3);
3222
3223 server.mock_login().error500().expect(3).mount().await;
3224
3225 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3226 }
3227
3228 #[async_test]
3229 async fn test_retry_timeout_http_requests() {
3230 // Keep this timeout small so that the test doesn't take long
3231 let retry_timeout = Duration::from_secs(5);
3232 let server = MatrixMockServer::new().await;
3233 let client = server
3234 .client_builder()
3235 .on_builder(|builder| {
3236 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3237 })
3238 .build()
3239 .await;
3240
3241 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3242
3243 server.mock_login().error500().expect(2..).mount().await;
3244
3245 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3246 }
3247
3248 #[async_test]
3249 async fn test_short_retry_initial_http_requests() {
3250 let server = MatrixMockServer::new().await;
3251 let client = server
3252 .client_builder()
3253 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3254 .build()
3255 .await;
3256
3257 server.mock_login().error500().expect(3..).mount().await;
3258
3259 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3260 }
3261
3262 #[async_test]
3263 async fn test_no_retry_http_requests() {
3264 let server = MatrixMockServer::new().await;
3265 let client = server.client_builder().build().await;
3266
3267 server.mock_devices().error500().mock_once().mount().await;
3268
3269 client.devices().await.unwrap_err();
3270 }
3271
3272 #[async_test]
3273 async fn test_set_homeserver() {
3274 let client = MockClientBuilder::new(None).build().await;
3275 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3276
3277 let homeserver = Url::parse("http://example.com/").unwrap();
3278 client.set_homeserver(homeserver.clone());
3279 assert_eq!(client.homeserver(), homeserver);
3280 }
3281
3282 #[async_test]
3283 async fn test_search_user_request() {
3284 let server = MatrixMockServer::new().await;
3285 let client = server.client_builder().build().await;
3286
3287 server.mock_user_directory().ok().mock_once().mount().await;
3288
3289 let response = client.search_users("test", 50).await.unwrap();
3290 assert_eq!(response.results.len(), 1);
3291 let result = &response.results[0];
3292 assert_eq!(result.user_id.to_string(), "@test:example.me");
3293 assert_eq!(result.display_name.clone().unwrap(), "Test");
3294 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3295 assert!(!response.limited);
3296 }
3297
3298 #[async_test]
3299 async fn test_request_unstable_features() {
3300 let server = MatrixMockServer::new().await;
3301 let client = server.client_builder().no_server_versions().build().await;
3302
3303 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3304
3305 let unstable_features = client.unstable_features().await.unwrap();
3306 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3307 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3308 }
3309
3310 #[async_test]
3311 async fn test_can_homeserver_push_encrypted_event_to_device() {
3312 let server = MatrixMockServer::new().await;
3313 let client = server.client_builder().no_server_versions().build().await;
3314
3315 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3316
3317 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3318 assert!(msc4028_enabled);
3319 }
3320
3321 #[async_test]
3322 async fn test_recently_visited_rooms() {
3323 // Tracking recently visited rooms requires authentication
3324 let client = MockClientBuilder::new(None).unlogged().build().await;
3325 assert_matches!(
3326 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3327 Err(Error::AuthenticationRequired)
3328 );
3329
3330 let client = MockClientBuilder::new(None).build().await;
3331 let account = client.account();
3332
3333 // We should start off with an empty list
3334 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3335
3336 // Tracking a valid room id should add it to the list
3337 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3338 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3339 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3340
3341 // And the existing list shouldn't be changed
3342 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3343 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3344
3345 // Tracking the same room again shouldn't change the list
3346 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3347 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3348 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3349
3350 // Tracking a second room should add it to the front of the list
3351 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3352 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3353 assert_eq!(
3354 account.get_recently_visited_rooms().await.unwrap(),
3355 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3356 );
3357
3358 // Tracking the first room yet again should move it to the front of the list
3359 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3360 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3361 assert_eq!(
3362 account.get_recently_visited_rooms().await.unwrap(),
3363 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3364 );
3365
3366 // Tracking should be capped at 20
3367 for n in 0..20 {
3368 account
3369 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3370 .await
3371 .unwrap();
3372 }
3373
3374 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3375
3376 // And the initial rooms should've been pushed out
3377 let rooms = account.get_recently_visited_rooms().await.unwrap();
3378 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3379 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3380
3381 // And the last tracked room should be the first
3382 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3383 }
3384
3385 #[async_test]
3386 async fn test_client_no_cycle_with_event_cache() {
3387 let client = MockClientBuilder::new(None).build().await;
3388
3389 // Wait for the init tasks to die.
3390 sleep(Duration::from_secs(1)).await;
3391
3392 let weak_client = WeakClient::from_client(&client);
3393 assert_eq!(weak_client.strong_count(), 1);
3394
3395 {
3396 let room_id = room_id!("!room:example.org");
3397
3398 // Have the client know the room.
3399 let response = SyncResponseBuilder::default()
3400 .add_joined_room(JoinedRoomBuilder::new(room_id))
3401 .build_sync_response();
3402 client.inner.base_client.receive_sync_response(response).await.unwrap();
3403
3404 client.event_cache().subscribe().unwrap();
3405
3406 let (_room_event_cache, _drop_handles) =
3407 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3408 }
3409
3410 drop(client);
3411
3412 // Give a bit of time for background tasks to die.
3413 sleep(Duration::from_secs(1)).await;
3414
3415 // The weak client must be the last reference to the client now.
3416 assert_eq!(weak_client.strong_count(), 0);
3417 let client = weak_client.get();
3418 assert!(
3419 client.is_none(),
3420 "too many strong references to the client: {}",
3421 Arc::strong_count(&client.unwrap().inner)
3422 );
3423 }
3424
3425 #[async_test]
3426 async fn test_server_info_caching() {
3427 let server = MatrixMockServer::new().await;
3428 let server_url = server.uri();
3429 let domain = server_url.strip_prefix("http://").unwrap();
3430 let server_name = <&ServerName>::try_from(domain).unwrap();
3431 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3432
3433 let well_known_mock = server
3434 .mock_well_known()
3435 .ok()
3436 .named("well known mock")
3437 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3438 .mount_as_scoped()
3439 .await;
3440
3441 let versions_mock = server
3442 .mock_versions()
3443 .ok_with_unstable_features()
3444 .named("first versions mock")
3445 .expect(1)
3446 .mount_as_scoped()
3447 .await;
3448
3449 let memory_store = Arc::new(MemoryStore::new());
3450 let client = Client::builder()
3451 .insecure_server_name_no_tls(server_name)
3452 .store_config(
3453 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3454 .state_store(memory_store.clone()),
3455 )
3456 .build()
3457 .await
3458 .unwrap();
3459
3460 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3461
3462 // These subsequent calls hit the in-memory cache.
3463 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3464 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3465
3466 drop(client);
3467
3468 let client = server
3469 .client_builder()
3470 .no_server_versions()
3471 .on_builder(|builder| {
3472 builder.store_config(
3473 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3474 .state_store(memory_store.clone()),
3475 )
3476 })
3477 .build()
3478 .await;
3479
3480 // These calls to the new client hit the on-disk cache.
3481 assert!(
3482 client
3483 .unstable_features()
3484 .await
3485 .unwrap()
3486 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3487 );
3488 let supported = client.supported_versions().await.unwrap();
3489 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3490 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3491
3492 // Then this call hits the in-memory cache.
3493 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3494
3495 drop(versions_mock);
3496 drop(well_known_mock);
3497
3498 // Now, reset the cache, and observe the endpoints being called again once.
3499 client.reset_server_info().await.unwrap();
3500
3501 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3502
3503 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3504
3505 // Hits network again.
3506 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3507 // Hits in-memory cache again.
3508 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3509 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3510 }
3511
3512 #[async_test]
3513 async fn test_server_info_without_a_well_known() {
3514 let server = MatrixMockServer::new().await;
3515 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3516
3517 let versions_mock = server
3518 .mock_versions()
3519 .ok_with_unstable_features()
3520 .named("first versions mock")
3521 .expect(1)
3522 .mount_as_scoped()
3523 .await;
3524
3525 let memory_store = Arc::new(MemoryStore::new());
3526 let client = server
3527 .client_builder()
3528 .no_server_versions()
3529 .on_builder(|builder| {
3530 builder.store_config(
3531 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3532 .state_store(memory_store.clone()),
3533 )
3534 })
3535 .build()
3536 .await;
3537
3538 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3539
3540 // These subsequent calls hit the in-memory cache.
3541 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3542 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3543
3544 drop(client);
3545
3546 let client = server
3547 .client_builder()
3548 .no_server_versions()
3549 .on_builder(|builder| {
3550 builder.store_config(
3551 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3552 .state_store(memory_store.clone()),
3553 )
3554 })
3555 .build()
3556 .await;
3557
3558 // This call to the new client hits the on-disk cache.
3559 assert!(
3560 client
3561 .unstable_features()
3562 .await
3563 .unwrap()
3564 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3565 );
3566
3567 // Then this call hits the in-memory cache.
3568 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3569
3570 drop(versions_mock);
3571
3572 // Now, reset the cache, and observe the endpoints being called again once.
3573 client.reset_server_info().await.unwrap();
3574
3575 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3576
3577 // Hits network again.
3578 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3579 // Hits in-memory cache again.
3580 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3581 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3582 }
3583
3584 #[async_test]
3585 async fn test_no_network_doesnt_cause_infinite_retries() {
3586 // We want infinite retries for transient errors.
3587 let client = MockClientBuilder::new(None)
3588 .on_builder(|builder| builder.request_config(RequestConfig::new()))
3589 .build()
3590 .await;
3591
3592 // We don't define a mock server on purpose here, so that the error is really a
3593 // network error.
3594 client.whoami().await.unwrap_err();
3595 }
3596
3597 #[async_test]
3598 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3599 let server = MatrixMockServer::new().await;
3600 let client = server.client_builder().build().await;
3601
3602 let room_id = room_id!("!room:example.org");
3603
3604 server
3605 .mock_sync()
3606 .ok_and_run(&client, |builder| {
3607 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3608 })
3609 .await;
3610
3611 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3612 assert_eq!(room.room_id(), room_id);
3613 }
3614
3615 #[async_test]
3616 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3617 let server = MatrixMockServer::new().await;
3618 let client = server.client_builder().build().await;
3619
3620 let room_id = room_id!("!room:example.org");
3621
3622 let client = Arc::new(client);
3623
3624 // Perform the /sync request with a delay so it starts after the
3625 // `await_room_remote_echo` call has happened
3626 spawn({
3627 let client = client.clone();
3628 async move {
3629 sleep(Duration::from_millis(100)).await;
3630
3631 server
3632 .mock_sync()
3633 .ok_and_run(&client, |builder| {
3634 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3635 })
3636 .await;
3637 }
3638 });
3639
3640 let room =
3641 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3642 assert_eq!(room.room_id(), room_id);
3643 }
3644
3645 #[async_test]
3646 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3647 let client = MockClientBuilder::new(None).build().await;
3648
3649 let room_id = room_id!("!room:example.org");
3650 // Room is not present so the client won't be able to find it. The call will
3651 // timeout.
3652 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3653 }
3654
3655 #[async_test]
3656 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3657 let server = MatrixMockServer::new().await;
3658 let client = server.client_builder().build().await;
3659
3660 server.mock_create_room().ok().mount().await;
3661
3662 // Create a room in the internal store
3663 let room = client
3664 .create_room(assign!(CreateRoomRequest::new(), {
3665 invite: vec![],
3666 is_direct: false,
3667 }))
3668 .await
3669 .unwrap();
3670
3671 // Room is locally present, but not synced, the call will timeout
3672 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3673 .await
3674 .unwrap_err();
3675 }
3676
3677 #[async_test]
3678 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3679 let server = MatrixMockServer::new().await;
3680 let client = server.client_builder().build().await;
3681
3682 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3683
3684 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3685 assert_matches!(ret, Ok(true));
3686 }
3687
3688 #[async_test]
3689 async fn test_is_room_alias_available_if_alias_is_resolved() {
3690 let server = MatrixMockServer::new().await;
3691 let client = server.client_builder().build().await;
3692
3693 server
3694 .mock_room_directory_resolve_alias()
3695 .ok("!some_room_id:matrix.org", Vec::new())
3696 .expect(1)
3697 .mount()
3698 .await;
3699
3700 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3701 assert_matches!(ret, Ok(false));
3702 }
3703
3704 #[async_test]
3705 async fn test_is_room_alias_available_if_error_found() {
3706 let server = MatrixMockServer::new().await;
3707 let client = server.client_builder().build().await;
3708
3709 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3710
3711 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3712 assert_matches!(ret, Err(_));
3713 }
3714
3715 #[async_test]
3716 async fn test_create_room_alias() {
3717 let server = MatrixMockServer::new().await;
3718 let client = server.client_builder().build().await;
3719
3720 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3721
3722 let ret = client
3723 .create_room_alias(
3724 room_alias_id!("#some_alias:matrix.org"),
3725 room_id!("!some_room:matrix.org"),
3726 )
3727 .await;
3728 assert_matches!(ret, Ok(()));
3729 }
3730
3731 #[async_test]
3732 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3733 let server = MatrixMockServer::new().await;
3734 let client = server.client_builder().build().await;
3735
3736 let room_id = room_id!("!a-room:matrix.org");
3737
3738 // Make sure the summary endpoint is called once
3739 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3740
3741 // We create a locally cached invited room
3742 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3743
3744 // And we get a preview, the server endpoint was reached
3745 let preview = client
3746 .get_room_preview(room_id.into(), Vec::new())
3747 .await
3748 .expect("Room preview should be retrieved");
3749
3750 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3751 }
3752
3753 #[async_test]
3754 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3755 let server = MatrixMockServer::new().await;
3756 let client = server.client_builder().build().await;
3757
3758 let room_id = room_id!("!a-room:matrix.org");
3759
3760 // Make sure the summary endpoint is called once
3761 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3762
3763 // We create a locally cached left room
3764 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3765
3766 // And we get a preview, the server endpoint was reached
3767 let preview = client
3768 .get_room_preview(room_id.into(), Vec::new())
3769 .await
3770 .expect("Room preview should be retrieved");
3771
3772 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3773 }
3774
3775 #[async_test]
3776 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3777 let server = MatrixMockServer::new().await;
3778 let client = server.client_builder().build().await;
3779
3780 let room_id = room_id!("!a-room:matrix.org");
3781
3782 // Make sure the summary endpoint is called once
3783 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3784
3785 // We create a locally cached knocked room
3786 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3787
3788 // And we get a preview, the server endpoint was reached
3789 let preview = client
3790 .get_room_preview(room_id.into(), Vec::new())
3791 .await
3792 .expect("Room preview should be retrieved");
3793
3794 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3795 }
3796
3797 #[async_test]
3798 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3799 let server = MatrixMockServer::new().await;
3800 let client = server.client_builder().build().await;
3801
3802 let room_id = room_id!("!a-room:matrix.org");
3803
3804 // Make sure the summary endpoint is not called
3805 server.mock_room_summary().ok(room_id).never().mount().await;
3806
3807 // We create a locally cached joined room
3808 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3809
3810 // And we get a preview, no server endpoint was reached
3811 let preview = client
3812 .get_room_preview(room_id.into(), Vec::new())
3813 .await
3814 .expect("Room preview should be retrieved");
3815
3816 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3817 }
3818
3819 #[async_test]
3820 async fn test_media_preview_config() {
3821 let server = MatrixMockServer::new().await;
3822 let client = server.client_builder().build().await;
3823
3824 server
3825 .mock_sync()
3826 .ok_and_run(&client, |builder| {
3827 builder.add_custom_global_account_data(json!({
3828 "content": {
3829 "media_previews": "private",
3830 "invite_avatars": "off"
3831 },
3832 "type": "m.media_preview_config"
3833 }));
3834 })
3835 .await;
3836
3837 let (initial_value, stream) =
3838 client.account().observe_media_preview_config().await.unwrap();
3839
3840 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3841 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3842 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3843 pin_mut!(stream);
3844 assert_pending!(stream);
3845
3846 server
3847 .mock_sync()
3848 .ok_and_run(&client, |builder| {
3849 builder.add_custom_global_account_data(json!({
3850 "content": {
3851 "media_previews": "off",
3852 "invite_avatars": "on"
3853 },
3854 "type": "m.media_preview_config"
3855 }));
3856 })
3857 .await;
3858
3859 assert_next_matches!(
3860 stream,
3861 MediaPreviewConfigEventContent {
3862 media_previews: Some(MediaPreviews::Off),
3863 invite_avatars: Some(InviteAvatars::On),
3864 ..
3865 }
3866 );
3867 assert_pending!(stream);
3868 }
3869
3870 #[async_test]
3871 async fn test_unstable_media_preview_config() {
3872 let server = MatrixMockServer::new().await;
3873 let client = server.client_builder().build().await;
3874
3875 server
3876 .mock_sync()
3877 .ok_and_run(&client, |builder| {
3878 builder.add_custom_global_account_data(json!({
3879 "content": {
3880 "media_previews": "private",
3881 "invite_avatars": "off"
3882 },
3883 "type": "io.element.msc4278.media_preview_config"
3884 }));
3885 })
3886 .await;
3887
3888 let (initial_value, stream) =
3889 client.account().observe_media_preview_config().await.unwrap();
3890
3891 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3892 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3893 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3894 pin_mut!(stream);
3895 assert_pending!(stream);
3896
3897 server
3898 .mock_sync()
3899 .ok_and_run(&client, |builder| {
3900 builder.add_custom_global_account_data(json!({
3901 "content": {
3902 "media_previews": "off",
3903 "invite_avatars": "on"
3904 },
3905 "type": "io.element.msc4278.media_preview_config"
3906 }));
3907 })
3908 .await;
3909
3910 assert_next_matches!(
3911 stream,
3912 MediaPreviewConfigEventContent {
3913 media_previews: Some(MediaPreviews::Off),
3914 invite_avatars: Some(InviteAvatars::On),
3915 ..
3916 }
3917 );
3918 assert_pending!(stream);
3919 }
3920
3921 #[async_test]
3922 async fn test_media_preview_config_not_found() {
3923 let server = MatrixMockServer::new().await;
3924 let client = server.client_builder().build().await;
3925
3926 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3927
3928 assert!(initial_value.is_none());
3929 }
3930
3931 #[async_test]
3932 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
3933 // The default Matrix version we use is 1.11 or higher, so authenticated media
3934 // is supported.
3935 let server = MatrixMockServer::new().await;
3936 let client = server.client_builder().build().await;
3937
3938 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3939
3940 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3941 client.load_or_fetch_max_upload_size().await.unwrap();
3942
3943 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3944 }
3945
3946 #[async_test]
3947 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
3948 // The server must advertise support for the stable feature for authenticated
3949 // media support, so we mock the `GET /versions` response.
3950 let server = MatrixMockServer::new().await;
3951 let client = server.client_builder().no_server_versions().build().await;
3952
3953 server
3954 .mock_versions()
3955 .ok_custom(
3956 &["v1.7", "v1.8", "v1.9", "v1.10"],
3957 &[("org.matrix.msc3916.stable", true)].into(),
3958 )
3959 .named("versions")
3960 .expect(1)
3961 .mount()
3962 .await;
3963
3964 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3965
3966 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3967 client.load_or_fetch_max_upload_size().await.unwrap();
3968
3969 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3970 }
3971
3972 #[async_test]
3973 async fn test_load_or_fetch_max_upload_size_no_auth() {
3974 // The server must not support Matrix 1.11 or higher for unauthenticated
3975 // media requests, so we mock the `GET /versions` response.
3976 let server = MatrixMockServer::new().await;
3977 let client = server.client_builder().no_server_versions().build().await;
3978
3979 server
3980 .mock_versions()
3981 .ok_custom(&["v1.1"], &Default::default())
3982 .named("versions")
3983 .expect(1)
3984 .mount()
3985 .await;
3986
3987 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3988
3989 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
3990 client.load_or_fetch_max_upload_size().await.unwrap();
3991
3992 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3993 }
3994
3995 #[async_test]
3996 async fn test_uploading_a_too_large_media_file() {
3997 let server = MatrixMockServer::new().await;
3998 let client = server.client_builder().build().await;
3999
4000 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4001 client.load_or_fetch_max_upload_size().await.unwrap();
4002 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4003
4004 let data = vec![1, 2];
4005 let upload_request =
4006 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4007 let request = SendRequest {
4008 client: client.clone(),
4009 request: upload_request,
4010 config: None,
4011 send_progress: SharedObservable::new(TransmissionProgress::default()),
4012 };
4013 let media_request = SendMediaUploadRequest::new(request);
4014
4015 let error = media_request.await.err();
4016 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4017 assert_eq!(max, uint!(1));
4018 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4019 }
4020
4021 #[async_test]
4022 async fn test_dont_ignore_timeout_on_first_sync() {
4023 let server = MatrixMockServer::new().await;
4024 let client = server.client_builder().build().await;
4025
4026 server
4027 .mock_sync()
4028 .timeout(Some(Duration::from_secs(30)))
4029 .ok(|_| {})
4030 .mock_once()
4031 .named("sync_with_timeout")
4032 .mount()
4033 .await;
4034
4035 // Call the endpoint once to check the timeout.
4036 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4037
4038 timeout(Duration::from_secs(1), async {
4039 stream.next().await.unwrap().unwrap();
4040 })
4041 .await
4042 .unwrap();
4043 }
4044
4045 #[async_test]
4046 async fn test_ignore_timeout_on_first_sync() {
4047 let server = MatrixMockServer::new().await;
4048 let client = server.client_builder().build().await;
4049
4050 server
4051 .mock_sync()
4052 .timeout(None)
4053 .ok(|_| {})
4054 .mock_once()
4055 .named("sync_no_timeout")
4056 .mount()
4057 .await;
4058 server
4059 .mock_sync()
4060 .timeout(Some(Duration::from_secs(30)))
4061 .ok(|_| {})
4062 .mock_once()
4063 .named("sync_with_timeout")
4064 .mount()
4065 .await;
4066
4067 // Call each version of the endpoint once to check the timeouts.
4068 let mut stream = Box::pin(
4069 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4070 );
4071
4072 timeout(Duration::from_secs(1), async {
4073 stream.next().await.unwrap().unwrap();
4074 stream.next().await.unwrap().unwrap();
4075 })
4076 .await
4077 .unwrap();
4078 }
4079}