matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{btree_map, BTreeMap},
19 fmt::{self, Debug},
20 future::{ready, Future},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23};
24
25use caches::ClientCaches;
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::store::LockableCryptoStore;
32use matrix_sdk_base::{
33 event_cache::store::EventCacheStoreLock,
34 store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
35 sync::{Notification, RoomUpdates},
36 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
37 StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm,
38};
39use matrix_sdk_common::ttl_cache::TtlCache;
40#[cfg(feature = "e2e-encryption")]
41use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
42use ruma::{
43 api::{
44 client::{
45 account::whoami,
46 alias::{create_alias, delete_alias, get_alias},
47 device::{delete_devices, get_devices, update_device},
48 directory::{get_public_rooms, get_public_rooms_filtered},
49 discovery::{
50 discover_homeserver,
51 discover_homeserver::RtcFocusInfo,
52 get_capabilities::{self, Capabilities},
53 get_supported_versions,
54 },
55 error::ErrorKind,
56 filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
57 knock::knock_room,
58 membership::{join_room_by_id, join_room_by_id_or_alias},
59 room::create_room,
60 session::login::v3::DiscoveryInfo,
61 sync::sync_events,
62 uiaa,
63 user_directory::search_users,
64 },
65 error::FromHttpResponseError,
66 MatrixVersion, OutgoingRequest,
67 },
68 assign,
69 push::Ruleset,
70 time::Instant,
71 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
72 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
73};
74use serde::de::DeserializeOwned;
75use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
76use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
77use url::Url;
78
79use self::futures::SendRequest;
80use crate::{
81 authentication::{
82 matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
83 SaveSessionCallback,
84 },
85 config::RequestConfig,
86 deduplicating_handler::DeduplicatingHandler,
87 error::HttpResult,
88 event_cache::EventCache,
89 event_handler::{
90 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
91 EventHandlerStore, ObservableEventHandler, SyncEvent,
92 },
93 http_client::HttpClient,
94 media::MediaError,
95 notification_settings::NotificationSettings,
96 room_preview::RoomPreview,
97 send_queue::SendQueueData,
98 sliding_sync::Version as SlidingSyncVersion,
99 sync::{RoomUpdate, SyncResponse},
100 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
101 Room, SessionTokens, TransmissionProgress,
102};
103#[cfg(feature = "e2e-encryption")]
104use crate::{
105 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
106 store_locks::CrossProcessStoreLock,
107};
108
109mod builder;
110pub(crate) mod caches;
111pub(crate) mod futures;
112
113pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
114
115#[cfg(not(target_family = "wasm"))]
116type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
117#[cfg(target_family = "wasm")]
118type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
119
120#[cfg(not(target_family = "wasm"))]
121type NotificationHandlerFn =
122 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
123#[cfg(target_family = "wasm")]
124type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
125
126/// Enum controlling if a loop running callbacks should continue or abort.
127///
128/// This is mainly used in the [`sync_with_callback`] method, the return value
129/// of the provided callback controls if the sync loop should be exited.
130///
131/// [`sync_with_callback`]: #method.sync_with_callback
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133pub enum LoopCtrl {
134 /// Continue running the loop.
135 Continue,
136 /// Break out of the loop.
137 Break,
138}
139
140/// Represents changes that can occur to a `Client`s `Session`.
141#[derive(Debug, Clone, PartialEq)]
142pub enum SessionChange {
143 /// The session's token is no longer valid.
144 UnknownToken {
145 /// Whether or not the session was soft logged out
146 soft_logout: bool,
147 },
148 /// The session's tokens have been refreshed.
149 TokensRefreshed,
150}
151
152/// An async/await enabled Matrix client.
153///
154/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
155#[derive(Clone)]
156pub struct Client {
157 pub(crate) inner: Arc<ClientInner>,
158}
159
160#[derive(Default)]
161pub(crate) struct ClientLocks {
162 /// Lock ensuring that only a single room may be marked as a DM at once.
163 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
164 /// explanation.
165 pub(crate) mark_as_dm_lock: Mutex<()>,
166
167 /// Lock ensuring that only a single secret store is getting opened at the
168 /// same time.
169 ///
170 /// This is important so we don't accidentally create multiple different new
171 /// default secret storage keys.
172 #[cfg(feature = "e2e-encryption")]
173 pub(crate) open_secret_store_lock: Mutex<()>,
174
175 /// Lock ensuring that we're only storing a single secret at a time.
176 ///
177 /// Take a look at the [`SecretStore::put_secret`] method for a more
178 /// detailed explanation.
179 ///
180 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
181 #[cfg(feature = "e2e-encryption")]
182 pub(crate) store_secret_lock: Mutex<()>,
183
184 /// Lock ensuring that only one method at a time might modify our backup.
185 #[cfg(feature = "e2e-encryption")]
186 pub(crate) backup_modify_lock: Mutex<()>,
187
188 /// Lock ensuring that we're going to attempt to upload backups for a single
189 /// requester.
190 #[cfg(feature = "e2e-encryption")]
191 pub(crate) backup_upload_lock: Mutex<()>,
192
193 /// Handler making sure we only have one group session sharing request in
194 /// flight per room.
195 #[cfg(feature = "e2e-encryption")]
196 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
197
198 /// Lock making sure we're only doing one key claim request at a time.
199 #[cfg(feature = "e2e-encryption")]
200 pub(crate) key_claim_lock: Mutex<()>,
201
202 /// Handler to ensure that only one members request is running at a time,
203 /// given a room.
204 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
205
206 /// Handler to ensure that only one encryption state request is running at a
207 /// time, given a room.
208 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
209
210 /// Deduplicating handler for sending read receipts. The string is an
211 /// internal implementation detail, see [`Self::send_single_receipt`].
212 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
213
214 #[cfg(feature = "e2e-encryption")]
215 pub(crate) cross_process_crypto_store_lock:
216 OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
217
218 /// Latest "generation" of data known by the crypto store.
219 ///
220 /// This is a counter that only increments, set in the database (and can
221 /// wrap). It's incremented whenever some process acquires a lock for the
222 /// first time. *This assumes the crypto store lock is being held, to
223 /// avoid data races on writing to this value in the store*.
224 ///
225 /// The current process will maintain this value in local memory and in the
226 /// DB over time. Observing a different value than the one read in
227 /// memory, when reading from the store indicates that somebody else has
228 /// written into the database under our feet.
229 ///
230 /// TODO: this should live in the `OlmMachine`, since it's information
231 /// related to the lock. As of today (2023-07-28), we blow up the entire
232 /// olm machine when there's a generation mismatch. So storing the
233 /// generation in the olm machine would make the client think there's
234 /// *always* a mismatch, and that's why we need to store the generation
235 /// outside the `OlmMachine`.
236 #[cfg(feature = "e2e-encryption")]
237 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
238}
239
240pub(crate) struct ClientInner {
241 /// All the data related to authentication and authorization.
242 pub(crate) auth_ctx: Arc<AuthCtx>,
243
244 /// The URL of the server.
245 ///
246 /// Not to be confused with the `Self::homeserver`. `server` is usually
247 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
248 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
249 /// homeserver (at the time of writing — 2024-08-28).
250 ///
251 /// This value is optional depending on how the `Client` has been built.
252 /// If it's been built from a homeserver URL directly, we don't know the
253 /// server. However, if the `Client` has been built from a server URL or
254 /// name, then the homeserver has been discovered, and we know both.
255 server: Option<Url>,
256
257 /// The URL of the homeserver to connect to.
258 ///
259 /// This is the URL for the client-server Matrix API.
260 homeserver: StdRwLock<Url>,
261
262 /// The sliding sync version.
263 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
264
265 /// The underlying HTTP client.
266 pub(crate) http_client: HttpClient,
267
268 /// User session data.
269 pub(super) base_client: BaseClient,
270
271 /// Collection of in-memory caches for the [`Client`].
272 pub(crate) caches: ClientCaches,
273
274 /// Collection of locks individual client methods might want to use, either
275 /// to ensure that only a single call to a method happens at once or to
276 /// deduplicate multiple calls to a method.
277 pub(crate) locks: ClientLocks,
278
279 /// The cross-process store locks holder name.
280 ///
281 /// The SDK provides cross-process store locks (see
282 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
283 /// `holder_name` is the value used for all cross-process store locks
284 /// used by this `Client`.
285 ///
286 /// If multiple `Client`s are running in different processes, this
287 /// value MUST be different for each `Client`.
288 cross_process_store_locks_holder_name: String,
289
290 /// A mapping of the times at which the current user sent typing notices,
291 /// keyed by room.
292 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
293
294 /// Event handlers. See `add_event_handler`.
295 pub(crate) event_handlers: EventHandlerStore,
296
297 /// Notification handlers. See `register_notification_handler`.
298 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
299
300 /// The sender-side of channels used to receive room updates.
301 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
302
303 /// The sender-side of a channel used to observe all the room updates of a
304 /// sync response.
305 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
306
307 /// Whether the client should update its homeserver URL with the discovery
308 /// information present in the login response.
309 respect_login_well_known: bool,
310
311 /// An event that can be listened on to wait for a successful sync. The
312 /// event will only be fired if a sync loop is running. Can be used for
313 /// synchronization, e.g. if we send out a request to create a room, we can
314 /// wait for the sync to get the data to fetch a room object from the state
315 /// store.
316 pub(crate) sync_beat: event_listener::Event,
317
318 /// A central cache for events, inactive first.
319 ///
320 /// It becomes active when [`EventCache::subscribe`] is called.
321 pub(crate) event_cache: OnceCell<EventCache>,
322
323 /// End-to-end encryption related state.
324 #[cfg(feature = "e2e-encryption")]
325 pub(crate) e2ee: EncryptionData,
326
327 /// The verification state of our own device.
328 #[cfg(feature = "e2e-encryption")]
329 pub(crate) verification_state: SharedObservable<VerificationState>,
330
331 /// Whether to enable the experimental support for sending and receiving
332 /// encrypted room history on invite, per [MSC4268].
333 ///
334 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
335 #[cfg(feature = "e2e-encryption")]
336 pub(crate) enable_share_history_on_invite: bool,
337
338 /// Data related to the [`SendQueue`].
339 ///
340 /// [`SendQueue`]: crate::send_queue::SendQueue
341 pub(crate) send_queue_data: Arc<SendQueueData>,
342
343 /// The `max_upload_size` value of the homeserver, it contains the max
344 /// request size you can send.
345 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
346}
347
348impl ClientInner {
349 /// Create a new `ClientInner`.
350 ///
351 /// All the fields passed as parameters here are those that must be cloned
352 /// upon instantiation of a sub-client, e.g. a client specialized for
353 /// notifications.
354 #[allow(clippy::too_many_arguments)]
355 async fn new(
356 auth_ctx: Arc<AuthCtx>,
357 server: Option<Url>,
358 homeserver: Url,
359 sliding_sync_version: SlidingSyncVersion,
360 http_client: HttpClient,
361 base_client: BaseClient,
362 server_info: ClientServerInfo,
363 respect_login_well_known: bool,
364 event_cache: OnceCell<EventCache>,
365 send_queue: Arc<SendQueueData>,
366 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
367 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
368 cross_process_store_locks_holder_name: String,
369 ) -> Arc<Self> {
370 let caches = ClientCaches {
371 server_info: server_info.into(),
372 server_metadata: Mutex::new(TtlCache::new()),
373 };
374
375 let client = Self {
376 server,
377 homeserver: StdRwLock::new(homeserver),
378 auth_ctx,
379 sliding_sync_version: StdRwLock::new(sliding_sync_version),
380 http_client,
381 base_client,
382 caches,
383 locks: Default::default(),
384 cross_process_store_locks_holder_name,
385 typing_notice_times: Default::default(),
386 event_handlers: Default::default(),
387 notification_handlers: Default::default(),
388 room_update_channels: Default::default(),
389 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
390 // ballast for all observers to catch up.
391 room_updates_sender: broadcast::Sender::new(32),
392 respect_login_well_known,
393 sync_beat: event_listener::Event::new(),
394 event_cache,
395 send_queue_data: send_queue,
396 #[cfg(feature = "e2e-encryption")]
397 e2ee: EncryptionData::new(encryption_settings),
398 #[cfg(feature = "e2e-encryption")]
399 verification_state: SharedObservable::new(VerificationState::Unknown),
400 #[cfg(feature = "e2e-encryption")]
401 enable_share_history_on_invite,
402 server_max_upload_size: Mutex::new(OnceCell::new()),
403 };
404
405 #[allow(clippy::let_and_return)]
406 let client = Arc::new(client);
407
408 #[cfg(feature = "e2e-encryption")]
409 client.e2ee.initialize_room_key_tasks(&client);
410
411 let _ = client
412 .event_cache
413 .get_or_init(|| async {
414 EventCache::new(
415 WeakClient::from_inner(&client),
416 client.base_client.event_cache_store().clone(),
417 )
418 })
419 .await;
420
421 client
422 }
423}
424
425#[cfg(not(tarpaulin_include))]
426impl Debug for Client {
427 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
428 write!(fmt, "Client")
429 }
430}
431
432impl Client {
433 /// Create a new [`Client`] that will use the given homeserver.
434 ///
435 /// # Arguments
436 ///
437 /// * `homeserver_url` - The homeserver that the client should connect to.
438 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
439 Self::builder().homeserver_url(homeserver_url).build().await
440 }
441
442 /// Returns a subscriber that publishes an event every time the ignore user
443 /// list changes.
444 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
445 self.inner.base_client.subscribe_to_ignore_user_list_changes()
446 }
447
448 /// Create a new [`ClientBuilder`].
449 pub fn builder() -> ClientBuilder {
450 ClientBuilder::new()
451 }
452
453 pub(crate) fn base_client(&self) -> &BaseClient {
454 &self.inner.base_client
455 }
456
457 /// The underlying HTTP client.
458 pub fn http_client(&self) -> &reqwest::Client {
459 &self.inner.http_client.inner
460 }
461
462 pub(crate) fn locks(&self) -> &ClientLocks {
463 &self.inner.locks
464 }
465
466 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
467 &self.inner.auth_ctx
468 }
469
470 /// The cross-process store locks holder name.
471 ///
472 /// The SDK provides cross-process store locks (see
473 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
474 /// `holder_name` is the value used for all cross-process store locks
475 /// used by this `Client`.
476 pub fn cross_process_store_locks_holder_name(&self) -> &str {
477 &self.inner.cross_process_store_locks_holder_name
478 }
479
480 /// Change the homeserver URL used by this client.
481 ///
482 /// # Arguments
483 ///
484 /// * `homeserver_url` - The new URL to use.
485 fn set_homeserver(&self, homeserver_url: Url) {
486 *self.inner.homeserver.write().unwrap() = homeserver_url;
487 }
488
489 /// Get the capabilities of the homeserver.
490 ///
491 /// This method should be used to check what features are supported by the
492 /// homeserver.
493 ///
494 /// # Examples
495 ///
496 /// ```no_run
497 /// # use matrix_sdk::Client;
498 /// # use url::Url;
499 /// # async {
500 /// # let homeserver = Url::parse("http://example.com")?;
501 /// let client = Client::new(homeserver).await?;
502 ///
503 /// let capabilities = client.get_capabilities().await?;
504 ///
505 /// if capabilities.change_password.enabled {
506 /// // Change password
507 /// }
508 /// # anyhow::Ok(()) };
509 /// ```
510 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
511 let res = self.send(get_capabilities::v3::Request::new()).await?;
512 Ok(res.capabilities)
513 }
514
515 /// Get a copy of the default request config.
516 ///
517 /// The default request config is what's used when sending requests if no
518 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
519 /// function with such a parameter.
520 ///
521 /// If the default request config was not customized through
522 /// [`ClientBuilder`] when creating this `Client`, the returned value will
523 /// be equivalent to [`RequestConfig::default()`].
524 pub fn request_config(&self) -> RequestConfig {
525 self.inner.http_client.request_config
526 }
527
528 /// Check whether the client has been activated.
529 ///
530 /// A client is considered active when:
531 ///
532 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
533 /// is logged in,
534 /// 2. Has loaded cached data from storage,
535 /// 3. If encryption is enabled, it also initialized or restored its
536 /// `OlmMachine`.
537 pub fn is_active(&self) -> bool {
538 self.inner.base_client.is_active()
539 }
540
541 /// The server used by the client.
542 ///
543 /// See `Self::server` to learn more.
544 pub fn server(&self) -> Option<&Url> {
545 self.inner.server.as_ref()
546 }
547
548 /// The homeserver of the client.
549 pub fn homeserver(&self) -> Url {
550 self.inner.homeserver.read().unwrap().clone()
551 }
552
553 /// Get the sliding sync version.
554 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
555 self.inner.sliding_sync_version.read().unwrap().clone()
556 }
557
558 /// Override the sliding sync version.
559 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
560 let mut lock = self.inner.sliding_sync_version.write().unwrap();
561 *lock = version;
562 }
563
564 /// Get the Matrix user session meta information.
565 ///
566 /// If the client is currently logged in, this will return a
567 /// [`SessionMeta`] object which contains the user ID and device ID.
568 /// Otherwise it returns `None`.
569 pub fn session_meta(&self) -> Option<&SessionMeta> {
570 self.base_client().session_meta()
571 }
572
573 /// Returns a receiver that gets events for each room info update. To watch
574 /// for new events, use `receiver.resubscribe()`. Each event contains the
575 /// room and a boolean whether this event should trigger a room list update.
576 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
577 self.base_client().room_info_notable_update_receiver()
578 }
579
580 /// Performs a search for users.
581 /// The search is performed case-insensitively on user IDs and display names
582 ///
583 /// # Arguments
584 ///
585 /// * `search_term` - The search term for the search
586 /// * `limit` - The maximum number of results to return. Defaults to 10.
587 ///
588 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
589 pub async fn search_users(
590 &self,
591 search_term: &str,
592 limit: u64,
593 ) -> HttpResult<search_users::v3::Response> {
594 let mut request = search_users::v3::Request::new(search_term.to_owned());
595
596 if let Some(limit) = UInt::new(limit) {
597 request.limit = limit;
598 }
599
600 self.send(request).await
601 }
602
603 /// Get the user id of the current owner of the client.
604 pub fn user_id(&self) -> Option<&UserId> {
605 self.session_meta().map(|s| s.user_id.as_ref())
606 }
607
608 /// Get the device ID that identifies the current session.
609 pub fn device_id(&self) -> Option<&DeviceId> {
610 self.session_meta().map(|s| s.device_id.as_ref())
611 }
612
613 /// Get the current access token for this session.
614 ///
615 /// Will be `None` if the client has not been logged in.
616 pub fn access_token(&self) -> Option<String> {
617 self.auth_ctx().access_token()
618 }
619
620 /// Get the current tokens for this session.
621 ///
622 /// To be notified of changes in the session tokens, use
623 /// [`Client::subscribe_to_session_changes()`] or
624 /// [`Client::set_session_callbacks()`].
625 ///
626 /// Returns `None` if the client has not been logged in.
627 pub fn session_tokens(&self) -> Option<SessionTokens> {
628 self.auth_ctx().session_tokens()
629 }
630
631 /// Access the authentication API used to log in this client.
632 ///
633 /// Will be `None` if the client has not been logged in.
634 pub fn auth_api(&self) -> Option<AuthApi> {
635 match self.auth_ctx().auth_data.get()? {
636 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
637 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
638 }
639 }
640
641 /// Get the whole session info of this client.
642 ///
643 /// Will be `None` if the client has not been logged in.
644 ///
645 /// Can be used with [`Client::restore_session`] to restore a previously
646 /// logged-in session.
647 pub fn session(&self) -> Option<AuthSession> {
648 match self.auth_api()? {
649 AuthApi::Matrix(api) => api.session().map(Into::into),
650 AuthApi::OAuth(api) => api.full_session().map(Into::into),
651 }
652 }
653
654 /// Get a reference to the state store.
655 pub fn state_store(&self) -> &DynStateStore {
656 self.base_client().state_store()
657 }
658
659 /// Get a reference to the event cache store.
660 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
661 self.base_client().event_cache_store()
662 }
663
664 /// Access the native Matrix authentication API with this client.
665 pub fn matrix_auth(&self) -> MatrixAuth {
666 MatrixAuth::new(self.clone())
667 }
668
669 /// Get the account of the current owner of the client.
670 pub fn account(&self) -> Account {
671 Account::new(self.clone())
672 }
673
674 /// Get the encryption manager of the client.
675 #[cfg(feature = "e2e-encryption")]
676 pub fn encryption(&self) -> Encryption {
677 Encryption::new(self.clone())
678 }
679
680 /// Get the media manager of the client.
681 pub fn media(&self) -> Media {
682 Media::new(self.clone())
683 }
684
685 /// Get the pusher manager of the client.
686 pub fn pusher(&self) -> Pusher {
687 Pusher::new(self.clone())
688 }
689
690 /// Access the OAuth 2.0 API of the client.
691 pub fn oauth(&self) -> OAuth {
692 OAuth::new(self.clone())
693 }
694
695 /// Register a handler for a specific event type.
696 ///
697 /// The handler is a function or closure with one or more arguments. The
698 /// first argument is the event itself. All additional arguments are
699 /// "context" arguments: They have to implement [`EventHandlerContext`].
700 /// This trait is named that way because most of the types implementing it
701 /// give additional context about an event: The room it was in, its raw form
702 /// and other similar things. As two exceptions to this,
703 /// [`Client`] and [`EventHandlerHandle`] also implement the
704 /// `EventHandlerContext` trait so you don't have to clone your client
705 /// into the event handler manually and a handler can decide to remove
706 /// itself.
707 ///
708 /// Some context arguments are not universally applicable. A context
709 /// argument that isn't available for the given event type will result in
710 /// the event handler being skipped and an error being logged. The following
711 /// context argument types are only available for a subset of event types:
712 ///
713 /// * [`Room`] is only available for room-specific events, i.e. not for
714 /// events like global account data events or presence events.
715 ///
716 /// You can provide custom context via
717 /// [`add_event_handler_context`](Client::add_event_handler_context) and
718 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
719 /// into the event handler.
720 ///
721 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
722 ///
723 /// # Examples
724 ///
725 /// ```no_run
726 /// use matrix_sdk::{
727 /// deserialized_responses::EncryptionInfo,
728 /// event_handler::Ctx,
729 /// ruma::{
730 /// events::{
731 /// macros::EventContent,
732 /// push_rules::PushRulesEvent,
733 /// room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
734 /// },
735 /// push::Action,
736 /// Int, MilliSecondsSinceUnixEpoch,
737 /// },
738 /// Client, Room,
739 /// };
740 /// use serde::{Deserialize, Serialize};
741 ///
742 /// # async fn example(client: Client) {
743 /// client.add_event_handler(
744 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
745 /// // Common usage: Room event plus room and client.
746 /// },
747 /// );
748 /// client.add_event_handler(
749 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
750 /// async move {
751 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
752 /// // unencrypted events and events that were decrypted by the SDK.
753 /// }
754 /// },
755 /// );
756 /// client.add_event_handler(
757 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
758 /// async move {
759 /// // A `Vec<Action>` parameter allows you to know which push actions
760 /// // are applicable for an event. For example, an event with
761 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
762 /// // in the timeline.
763 /// }
764 /// },
765 /// );
766 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
767 /// // You can omit any or all arguments after the first.
768 /// });
769 ///
770 /// // Registering a temporary event handler:
771 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
772 /// /* Event handler */
773 /// });
774 /// client.remove_event_handler(handle);
775 ///
776 /// // Registering custom event handler context:
777 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
778 /// struct MyContext {
779 /// number: usize,
780 /// }
781 /// client.add_event_handler_context(MyContext { number: 5 });
782 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
783 /// // Use the context
784 /// });
785 ///
786 /// // Custom events work exactly the same way, you just need to declare
787 /// // the content struct and use the EventContent derive macro on it.
788 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
789 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
790 /// struct TokenEventContent {
791 /// token: String,
792 /// #[serde(rename = "exp")]
793 /// expires_at: MilliSecondsSinceUnixEpoch,
794 /// }
795 ///
796 /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
797 /// todo!("Display the token");
798 /// });
799 ///
800 /// // Event handler closures can also capture local variables.
801 /// // Make sure they are cheap to clone though, because they will be cloned
802 /// // every time the closure is called.
803 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
804 ///
805 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
806 /// println!("Calling the handler with identifier {data}");
807 /// });
808 /// # }
809 /// ```
810 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
811 where
812 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
813 H: EventHandler<Ev, Ctx>,
814 {
815 self.add_event_handler_impl(handler, None)
816 }
817
818 /// Register a handler for a specific room, and event type.
819 ///
820 /// This method works the same way as
821 /// [`add_event_handler`][Self::add_event_handler], except that the handler
822 /// will only be called for events in the room with the specified ID. See
823 /// that method for more details on event handler functions.
824 ///
825 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
826 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
827 /// your use case.
828 pub fn add_room_event_handler<Ev, Ctx, H>(
829 &self,
830 room_id: &RoomId,
831 handler: H,
832 ) -> EventHandlerHandle
833 where
834 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
835 H: EventHandler<Ev, Ctx>,
836 {
837 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
838 }
839
840 /// Observe a specific event type.
841 ///
842 /// `Ev` represents the kind of event that will be observed. `Ctx`
843 /// represents the context that will come with the event. It relies on the
844 /// same mechanism as [`Client::add_event_handler`]. The main difference is
845 /// that it returns an [`ObservableEventHandler`] and doesn't require a
846 /// user-defined closure. It is possible to subscribe to the
847 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
848 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
849 /// Ctx)`.
850 ///
851 /// Be careful that only the most recent value can be observed. Subscribers
852 /// are notified when a new value is sent, but there is no guarantee
853 /// that they will see all values.
854 ///
855 /// # Example
856 ///
857 /// Let's see a classical usage:
858 ///
859 /// ```
860 /// use futures_util::StreamExt as _;
861 /// use matrix_sdk::{
862 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
863 /// Client, Room,
864 /// };
865 ///
866 /// # async fn example(client: Client) -> Option<()> {
867 /// let observer =
868 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
869 ///
870 /// let mut subscriber = observer.subscribe();
871 ///
872 /// let (event, (room, push_actions)) = subscriber.next().await?;
873 /// # Some(())
874 /// # }
875 /// ```
876 ///
877 /// Now let's see how to get several contexts that can be useful for you:
878 ///
879 /// ```
880 /// use matrix_sdk::{
881 /// deserialized_responses::EncryptionInfo,
882 /// ruma::{
883 /// events::room::{
884 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
885 /// },
886 /// push::Action,
887 /// },
888 /// Client, Room,
889 /// };
890 ///
891 /// # async fn example(client: Client) {
892 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
893 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
894 ///
895 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
896 /// // to distinguish between unencrypted events and events that were decrypted
897 /// // by the SDK.
898 /// let _ = client
899 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
900 /// );
901 ///
902 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
903 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
904 /// // should be highlighted in the timeline.
905 /// let _ =
906 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
907 ///
908 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
909 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
910 /// # }
911 /// ```
912 ///
913 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
914 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
915 where
916 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
917 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
918 {
919 self.observe_room_events_impl(None)
920 }
921
922 /// Observe a specific room, and event type.
923 ///
924 /// This method works the same way as [`Client::observe_events`], except
925 /// that the observability will only be applied for events in the room with
926 /// the specified ID. See that method for more details.
927 ///
928 /// Be careful that only the most recent value can be observed. Subscribers
929 /// are notified when a new value is sent, but there is no guarantee
930 /// that they will see all values.
931 pub fn observe_room_events<Ev, Ctx>(
932 &self,
933 room_id: &RoomId,
934 ) -> ObservableEventHandler<(Ev, Ctx)>
935 where
936 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
937 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
938 {
939 self.observe_room_events_impl(Some(room_id.to_owned()))
940 }
941
942 /// Shared implementation for `Client::observe_events` and
943 /// `Client::observe_room_events`.
944 fn observe_room_events_impl<Ev, Ctx>(
945 &self,
946 room_id: Option<OwnedRoomId>,
947 ) -> ObservableEventHandler<(Ev, Ctx)>
948 where
949 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
950 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
951 {
952 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
953 // new value.
954 let shared_observable = SharedObservable::new(None);
955
956 ObservableEventHandler::new(
957 shared_observable.clone(),
958 self.event_handler_drop_guard(self.add_event_handler_impl(
959 move |event: Ev, context: Ctx| {
960 shared_observable.set(Some((event, context)));
961
962 ready(())
963 },
964 room_id,
965 )),
966 )
967 }
968
969 /// Remove the event handler associated with the handle.
970 ///
971 /// Note that you **must not** call `remove_event_handler` from the
972 /// non-async part of an event handler, that is:
973 ///
974 /// ```ignore
975 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
976 /// // ⚠ this will cause a deadlock ⚠
977 /// client.remove_event_handler(handle);
978 ///
979 /// async move {
980 /// // removing the event handler here is fine
981 /// client.remove_event_handler(handle);
982 /// }
983 /// })
984 /// ```
985 ///
986 /// Note also that handlers that remove themselves will still execute with
987 /// events received in the same sync cycle.
988 ///
989 /// # Arguments
990 ///
991 /// `handle` - The [`EventHandlerHandle`] that is returned when
992 /// registering the event handler with [`Client::add_event_handler`].
993 ///
994 /// # Examples
995 ///
996 /// ```no_run
997 /// # use url::Url;
998 /// # use tokio::sync::mpsc;
999 /// #
1000 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1001 /// #
1002 /// use matrix_sdk::{
1003 /// event_handler::EventHandlerHandle,
1004 /// ruma::events::room::member::SyncRoomMemberEvent, Client,
1005 /// };
1006 /// #
1007 /// # futures_executor::block_on(async {
1008 /// # let client = matrix_sdk::Client::builder()
1009 /// # .homeserver_url(homeserver)
1010 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1011 /// # .build()
1012 /// # .await
1013 /// # .unwrap();
1014 ///
1015 /// client.add_event_handler(
1016 /// |ev: SyncRoomMemberEvent,
1017 /// client: Client,
1018 /// handle: EventHandlerHandle| async move {
1019 /// // Common usage: Check arriving Event is the expected one
1020 /// println!("Expected RoomMemberEvent received!");
1021 /// client.remove_event_handler(handle);
1022 /// },
1023 /// );
1024 /// # });
1025 /// ```
1026 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1027 self.inner.event_handlers.remove(handle);
1028 }
1029
1030 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1031 /// the given handle.
1032 ///
1033 /// When the returned value is dropped, the event handler will be removed.
1034 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1035 EventHandlerDropGuard::new(handle, self.clone())
1036 }
1037
1038 /// Add an arbitrary value for use as event handler context.
1039 ///
1040 /// The value can be obtained in an event handler by adding an argument of
1041 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1042 ///
1043 /// If a value of the same type has been added before, it will be
1044 /// overwritten.
1045 ///
1046 /// # Examples
1047 ///
1048 /// ```no_run
1049 /// use matrix_sdk::{
1050 /// event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1051 /// Room,
1052 /// };
1053 /// # #[derive(Clone)]
1054 /// # struct SomeType;
1055 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1056 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1057 /// # futures_executor::block_on(async {
1058 /// # let client = matrix_sdk::Client::builder()
1059 /// # .homeserver_url(homeserver)
1060 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1061 /// # .build()
1062 /// # .await
1063 /// # .unwrap();
1064 ///
1065 /// // Handle used to send messages to the UI part of the app
1066 /// let my_gui_handle: SomeType = obtain_gui_handle();
1067 ///
1068 /// client.add_event_handler_context(my_gui_handle.clone());
1069 /// client.add_event_handler(
1070 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1071 /// async move {
1072 /// // gui_handle.send(DisplayMessage { message: ev });
1073 /// }
1074 /// },
1075 /// );
1076 /// # });
1077 /// ```
1078 pub fn add_event_handler_context<T>(&self, ctx: T)
1079 where
1080 T: Clone + Send + Sync + 'static,
1081 {
1082 self.inner.event_handlers.add_context(ctx);
1083 }
1084
1085 /// Register a handler for a notification.
1086 ///
1087 /// Similar to [`Client::add_event_handler`], but only allows functions
1088 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1089 /// [`Client`] for now.
1090 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1091 where
1092 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1093 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1094 {
1095 self.inner.notification_handlers.write().await.push(Box::new(
1096 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1097 ));
1098
1099 self
1100 }
1101
1102 /// Subscribe to all updates for the room with the given ID.
1103 ///
1104 /// The returned receiver will receive a new message for each sync response
1105 /// that contains updates for that room.
1106 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1107 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1108 btree_map::Entry::Vacant(entry) => {
1109 let (tx, rx) = broadcast::channel(8);
1110 entry.insert(tx);
1111 rx
1112 }
1113 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1114 }
1115 }
1116
1117 /// Subscribe to all updates to all rooms, whenever any has been received in
1118 /// a sync response.
1119 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1120 self.inner.room_updates_sender.subscribe()
1121 }
1122
1123 pub(crate) async fn notification_handlers(
1124 &self,
1125 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1126 self.inner.notification_handlers.read().await
1127 }
1128
1129 /// Get all the rooms the client knows about.
1130 ///
1131 /// This will return the list of joined, invited, and left rooms.
1132 pub fn rooms(&self) -> Vec<Room> {
1133 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1134 }
1135
1136 /// Get all the rooms the client knows about, filtered by room state.
1137 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1138 self.base_client()
1139 .rooms_filtered(filter)
1140 .into_iter()
1141 .map(|room| Room::new(self.clone(), room))
1142 .collect()
1143 }
1144
1145 /// Get a stream of all the rooms, in addition to the existing rooms.
1146 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1147 let (rooms, stream) = self.base_client().rooms_stream();
1148
1149 let map_room = |room| Room::new(self.clone(), room);
1150
1151 (
1152 rooms.into_iter().map(map_room).collect(),
1153 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1154 )
1155 }
1156
1157 /// Returns the joined rooms this client knows about.
1158 pub fn joined_rooms(&self) -> Vec<Room> {
1159 self.base_client()
1160 .rooms_filtered(RoomStateFilter::JOINED)
1161 .into_iter()
1162 .map(|room| Room::new(self.clone(), room))
1163 .collect()
1164 }
1165
1166 /// Returns the invited rooms this client knows about.
1167 pub fn invited_rooms(&self) -> Vec<Room> {
1168 self.base_client()
1169 .rooms_filtered(RoomStateFilter::INVITED)
1170 .into_iter()
1171 .map(|room| Room::new(self.clone(), room))
1172 .collect()
1173 }
1174
1175 /// Returns the left rooms this client knows about.
1176 pub fn left_rooms(&self) -> Vec<Room> {
1177 self.base_client()
1178 .rooms_filtered(RoomStateFilter::LEFT)
1179 .into_iter()
1180 .map(|room| Room::new(self.clone(), room))
1181 .collect()
1182 }
1183
1184 /// Get a room with the given room id.
1185 ///
1186 /// # Arguments
1187 ///
1188 /// `room_id` - The unique id of the room that should be fetched.
1189 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1190 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1191 }
1192
1193 /// Gets the preview of a room, whether the current user has joined it or
1194 /// not.
1195 pub async fn get_room_preview(
1196 &self,
1197 room_or_alias_id: &RoomOrAliasId,
1198 via: Vec<OwnedServerName>,
1199 ) -> Result<RoomPreview> {
1200 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1201 Ok(room_id) => room_id.to_owned(),
1202 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1203 };
1204
1205 if let Some(room) = self.get_room(&room_id) {
1206 // The cached data can only be trusted if the room state is joined or
1207 // banned: for invite and knock rooms, no updates will be received
1208 // for the rooms after the invite/knock action took place so we may
1209 // have very out to date data for important fields such as
1210 // `join_rule`. For left rooms, the homeserver should return the latest info.
1211 match room.state() {
1212 RoomState::Joined | RoomState::Banned => {
1213 return Ok(RoomPreview::from_known_room(&room).await);
1214 }
1215 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1216 }
1217 }
1218
1219 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1220 }
1221
1222 /// Resolve a room alias to a room id and a list of servers which know
1223 /// about it.
1224 ///
1225 /// # Arguments
1226 ///
1227 /// `room_alias` - The room alias to be resolved.
1228 pub async fn resolve_room_alias(
1229 &self,
1230 room_alias: &RoomAliasId,
1231 ) -> HttpResult<get_alias::v3::Response> {
1232 let request = get_alias::v3::Request::new(room_alias.to_owned());
1233 self.send(request).await
1234 }
1235
1236 /// Checks if a room alias is not in use yet.
1237 ///
1238 /// Returns:
1239 /// - `Ok(true)` if the room alias is available.
1240 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1241 /// status code).
1242 /// - An `Err` otherwise.
1243 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1244 match self.resolve_room_alias(alias).await {
1245 // The room alias was resolved, so it's already in use.
1246 Ok(_) => Ok(false),
1247 Err(error) => {
1248 match error.client_api_error_kind() {
1249 // The room alias wasn't found, so it's available.
1250 Some(ErrorKind::NotFound) => Ok(true),
1251 _ => Err(error),
1252 }
1253 }
1254 }
1255 }
1256
1257 /// Adds a new room alias associated with a room to the room directory.
1258 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1259 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1260 self.send(request).await?;
1261 Ok(())
1262 }
1263
1264 /// Removes a room alias from the room directory.
1265 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1266 let request = delete_alias::v3::Request::new(alias.to_owned());
1267 self.send(request).await?;
1268 Ok(())
1269 }
1270
1271 /// Update the homeserver from the login response well-known if needed.
1272 ///
1273 /// # Arguments
1274 ///
1275 /// * `login_well_known` - The `well_known` field from a successful login
1276 /// response.
1277 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1278 if self.inner.respect_login_well_known {
1279 if let Some(well_known) = login_well_known {
1280 if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1281 self.set_homeserver(homeserver);
1282 }
1283 }
1284 }
1285 }
1286
1287 /// Similar to [`Client::restore_session_with`], with
1288 /// [`RoomLoadSettings::default()`].
1289 ///
1290 /// # Panics
1291 ///
1292 /// Panics if a session was already restored or logged in.
1293 #[instrument(skip_all)]
1294 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1295 self.restore_session_with(session, RoomLoadSettings::default()).await
1296 }
1297
1298 /// Restore a session previously logged-in using one of the available
1299 /// authentication APIs. The number of rooms to restore is controlled by
1300 /// [`RoomLoadSettings`].
1301 ///
1302 /// See the documentation of the corresponding authentication API's
1303 /// `restore_session` method for more information.
1304 ///
1305 /// # Panics
1306 ///
1307 /// Panics if a session was already restored or logged in.
1308 #[instrument(skip_all)]
1309 pub async fn restore_session_with(
1310 &self,
1311 session: impl Into<AuthSession>,
1312 room_load_settings: RoomLoadSettings,
1313 ) -> Result<()> {
1314 let session = session.into();
1315 match session {
1316 AuthSession::Matrix(session) => {
1317 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1318 }
1319 AuthSession::OAuth(session) => {
1320 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1321 }
1322 }
1323 }
1324
1325 /// Refresh the access token using the authentication API used to log into
1326 /// this session.
1327 ///
1328 /// See the documentation of the authentication API's `refresh_access_token`
1329 /// method for more information.
1330 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1331 let Some(auth_api) = self.auth_api() else {
1332 return Err(RefreshTokenError::RefreshTokenRequired);
1333 };
1334
1335 match auth_api {
1336 AuthApi::Matrix(api) => {
1337 trace!("Token refresh: Using the homeserver.");
1338 Box::pin(api.refresh_access_token()).await?;
1339 }
1340 AuthApi::OAuth(api) => {
1341 trace!("Token refresh: Using OAuth 2.0.");
1342 Box::pin(api.refresh_access_token()).await?;
1343 }
1344 }
1345
1346 Ok(())
1347 }
1348
1349 /// Log out the current session using the proper authentication API.
1350 ///
1351 /// # Errors
1352 ///
1353 /// Returns an error if the session is not authenticated or if an error
1354 /// occurred while making the request to the server.
1355 pub async fn logout(&self) -> Result<(), Error> {
1356 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1357 match auth_api {
1358 AuthApi::Matrix(matrix_auth) => {
1359 matrix_auth.logout().await?;
1360 Ok(())
1361 }
1362 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1363 }
1364 }
1365
1366 /// Get or upload a sync filter.
1367 ///
1368 /// This method will either get a filter ID from the store or upload the
1369 /// filter definition to the homeserver and return the new filter ID.
1370 ///
1371 /// # Arguments
1372 ///
1373 /// * `filter_name` - The unique name of the filter, this name will be used
1374 /// locally to store and identify the filter ID returned by the server.
1375 ///
1376 /// * `definition` - The filter definition that should be uploaded to the
1377 /// server if no filter ID can be found in the store.
1378 ///
1379 /// # Examples
1380 ///
1381 /// ```no_run
1382 /// # use matrix_sdk::{
1383 /// # Client, config::SyncSettings,
1384 /// # ruma::api::client::{
1385 /// # filter::{
1386 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1387 /// # },
1388 /// # sync::sync_events::v3::Filter,
1389 /// # }
1390 /// # };
1391 /// # use url::Url;
1392 /// # async {
1393 /// # let homeserver = Url::parse("http://example.com").unwrap();
1394 /// # let client = Client::new(homeserver).await.unwrap();
1395 /// let mut filter = FilterDefinition::default();
1396 ///
1397 /// // Let's enable member lazy loading.
1398 /// filter.room.state.lazy_load_options =
1399 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1400 ///
1401 /// let filter_id = client
1402 /// .get_or_upload_filter("sync", filter)
1403 /// .await
1404 /// .unwrap();
1405 ///
1406 /// let sync_settings = SyncSettings::new()
1407 /// .filter(Filter::FilterId(filter_id));
1408 ///
1409 /// let response = client.sync_once(sync_settings).await.unwrap();
1410 /// # };
1411 #[instrument(skip(self, definition))]
1412 pub async fn get_or_upload_filter(
1413 &self,
1414 filter_name: &str,
1415 definition: FilterDefinition,
1416 ) -> Result<String> {
1417 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1418 debug!("Found filter locally");
1419 Ok(filter)
1420 } else {
1421 debug!("Didn't find filter locally");
1422 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1423 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1424 let response = self.send(request).await?;
1425
1426 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1427
1428 Ok(response.filter_id)
1429 }
1430 }
1431
1432 /// Finish joining a room.
1433 ///
1434 /// If the room was an invite that should be marked as a DM, will include it
1435 /// in the DM event after creating the joined room.
1436 async fn finish_join_room(&self, room_id: &RoomId) -> Result<Room> {
1437 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1438 room.state() == RoomState::Invited
1439 && room.is_direct().await.unwrap_or_else(|e| {
1440 warn!(%room_id, "is_direct() failed: {e}");
1441 false
1442 })
1443 } else {
1444 false
1445 };
1446
1447 let base_room = self.base_client().room_joined(room_id).await?;
1448 let room = Room::new(self.clone(), base_room);
1449
1450 if mark_as_dm {
1451 room.set_is_direct(true).await?;
1452 }
1453
1454 Ok(room)
1455 }
1456
1457 /// Join a room by `RoomId`.
1458 ///
1459 /// Returns the `Room` in the joined state.
1460 ///
1461 /// # Arguments
1462 ///
1463 /// * `room_id` - The `RoomId` of the room to be joined.
1464 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1465 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1466 let response = self.send(request).await?;
1467 self.finish_join_room(&response.room_id).await
1468 }
1469
1470 /// Join a room by `RoomOrAliasId`.
1471 ///
1472 /// Returns the `Room` in the joined state.
1473 ///
1474 /// # Arguments
1475 ///
1476 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1477 /// alias looks like `#name:example.com`.
1478 /// * `server_names` - The server names to be used for resolving the alias,
1479 /// if needs be.
1480 pub async fn join_room_by_id_or_alias(
1481 &self,
1482 alias: &RoomOrAliasId,
1483 server_names: &[OwnedServerName],
1484 ) -> Result<Room> {
1485 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1486 via: server_names.to_owned(),
1487 });
1488 let response = self.send(request).await?;
1489 self.finish_join_room(&response.room_id).await
1490 }
1491
1492 /// Search the homeserver's directory of public rooms.
1493 ///
1494 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1495 /// a `get_public_rooms::Response`.
1496 ///
1497 /// # Arguments
1498 ///
1499 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1500 ///
1501 /// * `since` - Pagination token from a previous request.
1502 ///
1503 /// * `server` - The name of the server, if `None` the requested server is
1504 /// used.
1505 ///
1506 /// # Examples
1507 /// ```no_run
1508 /// use matrix_sdk::Client;
1509 /// # use url::Url;
1510 /// # let homeserver = Url::parse("http://example.com").unwrap();
1511 /// # let limit = Some(10);
1512 /// # let since = Some("since token");
1513 /// # let server = Some("servername.com".try_into().unwrap());
1514 /// # async {
1515 /// let mut client = Client::new(homeserver).await.unwrap();
1516 ///
1517 /// client.public_rooms(limit, since, server).await;
1518 /// # };
1519 /// ```
1520 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1521 pub async fn public_rooms(
1522 &self,
1523 limit: Option<u32>,
1524 since: Option<&str>,
1525 server: Option<&ServerName>,
1526 ) -> HttpResult<get_public_rooms::v3::Response> {
1527 let limit = limit.map(UInt::from);
1528
1529 let request = assign!(get_public_rooms::v3::Request::new(), {
1530 limit,
1531 since: since.map(ToOwned::to_owned),
1532 server: server.map(ToOwned::to_owned),
1533 });
1534 self.send(request).await
1535 }
1536
1537 /// Create a room with the given parameters.
1538 ///
1539 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1540 /// created room.
1541 ///
1542 /// If you want to create a direct message with one specific user, you can
1543 /// use [`create_dm`][Self::create_dm], which is more convenient than
1544 /// assembling the [`create_room::v3::Request`] yourself.
1545 ///
1546 /// If the `is_direct` field of the request is set to `true` and at least
1547 /// one user is invited, the room will be automatically added to the direct
1548 /// rooms in the account data.
1549 ///
1550 /// # Examples
1551 ///
1552 /// ```no_run
1553 /// use matrix_sdk::{
1554 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1555 /// Client,
1556 /// };
1557 /// # use url::Url;
1558 /// #
1559 /// # async {
1560 /// # let homeserver = Url::parse("http://example.com").unwrap();
1561 /// let request = CreateRoomRequest::new();
1562 /// let client = Client::new(homeserver).await.unwrap();
1563 /// assert!(client.create_room(request).await.is_ok());
1564 /// # };
1565 /// ```
1566 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1567 let invite = request.invite.clone();
1568 let is_direct_room = request.is_direct;
1569 let response = self.send(request).await?;
1570
1571 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1572
1573 let joined_room = Room::new(self.clone(), base_room);
1574
1575 if is_direct_room && !invite.is_empty() {
1576 if let Err(error) =
1577 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1578 {
1579 // FIXME: Retry in the background
1580 error!("Failed to mark room as DM: {error}");
1581 }
1582 }
1583
1584 Ok(joined_room)
1585 }
1586
1587 /// Create a DM room.
1588 ///
1589 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1590 /// given user being invited, the room marked `is_direct` and both the
1591 /// creator and invitee getting the default maximum power level.
1592 ///
1593 /// If the `e2e-encryption` feature is enabled, the room will also be
1594 /// encrypted.
1595 ///
1596 /// # Arguments
1597 ///
1598 /// * `user_id` - The ID of the user to create a DM for.
1599 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1600 #[cfg(feature = "e2e-encryption")]
1601 let initial_state =
1602 vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1603 .to_raw_any()];
1604
1605 #[cfg(not(feature = "e2e-encryption"))]
1606 let initial_state = vec![];
1607
1608 let request = assign!(create_room::v3::Request::new(), {
1609 invite: vec![user_id.to_owned()],
1610 is_direct: true,
1611 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1612 initial_state,
1613 });
1614
1615 self.create_room(request).await
1616 }
1617
1618 /// Search the homeserver's directory for public rooms with a filter.
1619 ///
1620 /// # Arguments
1621 ///
1622 /// * `room_search` - The easiest way to create this request is using the
1623 /// `get_public_rooms_filtered::Request` itself.
1624 ///
1625 /// # Examples
1626 ///
1627 /// ```no_run
1628 /// # use url::Url;
1629 /// # use matrix_sdk::Client;
1630 /// # async {
1631 /// # let homeserver = Url::parse("http://example.com")?;
1632 /// use matrix_sdk::ruma::{
1633 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1634 /// };
1635 /// # let mut client = Client::new(homeserver).await?;
1636 ///
1637 /// let mut filter = Filter::new();
1638 /// filter.generic_search_term = Some("rust".to_owned());
1639 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1640 /// request.filter = filter;
1641 ///
1642 /// let response = client.public_rooms_filtered(request).await?;
1643 ///
1644 /// for room in response.chunk {
1645 /// println!("Found room {room:?}");
1646 /// }
1647 /// # anyhow::Ok(()) };
1648 /// ```
1649 pub async fn public_rooms_filtered(
1650 &self,
1651 request: get_public_rooms_filtered::v3::Request,
1652 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1653 self.send(request).await
1654 }
1655
1656 /// Send an arbitrary request to the server, without updating client state.
1657 ///
1658 /// **Warning:** Because this method *does not* update the client state, it
1659 /// is important to make sure that you account for this yourself, and
1660 /// use wrapper methods where available. This method should *only* be
1661 /// used if a wrapper method for the endpoint you'd like to use is not
1662 /// available.
1663 ///
1664 /// # Arguments
1665 ///
1666 /// * `request` - A filled out and valid request for the endpoint to be hit
1667 ///
1668 /// * `timeout` - An optional request timeout setting, this overrides the
1669 /// default request setting if one was set.
1670 ///
1671 /// # Examples
1672 ///
1673 /// ```no_run
1674 /// # use matrix_sdk::{Client, config::SyncSettings};
1675 /// # use url::Url;
1676 /// # async {
1677 /// # let homeserver = Url::parse("http://localhost:8080")?;
1678 /// # let mut client = Client::new(homeserver).await?;
1679 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1680 ///
1681 /// // First construct the request you want to make
1682 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1683 /// // for all available Endpoints
1684 /// let user_id = user_id!("@example:localhost").to_owned();
1685 /// let request = profile::get_profile::v3::Request::new(user_id);
1686 ///
1687 /// // Start the request using Client::send()
1688 /// let response = client.send(request).await?;
1689 ///
1690 /// // Check the corresponding Response struct to find out what types are
1691 /// // returned
1692 /// # anyhow::Ok(()) };
1693 /// ```
1694 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1695 where
1696 Request: OutgoingRequest + Clone + Debug,
1697 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1698 {
1699 SendRequest {
1700 client: self.clone(),
1701 request,
1702 config: None,
1703 send_progress: Default::default(),
1704 }
1705 }
1706
1707 pub(crate) async fn send_inner<Request>(
1708 &self,
1709 request: Request,
1710 config: Option<RequestConfig>,
1711 send_progress: SharedObservable<TransmissionProgress>,
1712 ) -> HttpResult<Request::IncomingResponse>
1713 where
1714 Request: OutgoingRequest + Debug,
1715 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1716 {
1717 let homeserver = self.homeserver().to_string();
1718 let access_token = self.access_token();
1719
1720 self.inner
1721 .http_client
1722 .send(
1723 request,
1724 config,
1725 homeserver,
1726 access_token.as_deref(),
1727 &self.server_versions().await?,
1728 send_progress,
1729 )
1730 .await
1731 }
1732
1733 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1734 _ = self
1735 .inner
1736 .auth_ctx
1737 .session_change_sender
1738 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1739 }
1740
1741 /// Fetches server versions from network; no caching.
1742 pub async fn fetch_server_versions(
1743 &self,
1744 request_config: Option<RequestConfig>,
1745 ) -> HttpResult<get_supported_versions::Response> {
1746 let server_versions = self
1747 .inner
1748 .http_client
1749 .send(
1750 get_supported_versions::Request::new(),
1751 request_config,
1752 self.homeserver().to_string(),
1753 None,
1754 &[MatrixVersion::V1_0],
1755 Default::default(),
1756 )
1757 .await?;
1758
1759 Ok(server_versions)
1760 }
1761
1762 /// Fetches client well_known from network; no caching.
1763 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1764 let server_url_string = self
1765 .server()
1766 .unwrap_or(
1767 // Sometimes people configure their well-known directly on the homeserver so use
1768 // this as a fallback when the server name is unknown.
1769 &self.homeserver(),
1770 )
1771 .to_string();
1772
1773 let well_known = self
1774 .inner
1775 .http_client
1776 .send(
1777 discover_homeserver::Request::new(),
1778 Some(RequestConfig::short_retry()),
1779 server_url_string,
1780 None,
1781 &[MatrixVersion::V1_0],
1782 Default::default(),
1783 )
1784 .await;
1785
1786 match well_known {
1787 Ok(well_known) => Some(well_known),
1788 Err(http_error) => {
1789 // It is perfectly valid to not have a well-known file.
1790 // Maybe we should check for a specific error code to be sure?
1791 warn!("Failed to fetch client well-known: {http_error}");
1792 None
1793 }
1794 }
1795 }
1796
1797 /// Load server info from storage, or fetch them from network and cache
1798 /// them.
1799 async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1800 match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1801 Ok(Some(stored)) => {
1802 if let Some(server_info) =
1803 stored.into_server_info().and_then(|info| info.maybe_decode())
1804 {
1805 return Ok(server_info);
1806 }
1807 }
1808 Ok(None) => {
1809 // fallthrough: cache is empty
1810 }
1811 Err(err) => {
1812 warn!("error when loading cached server info: {err}");
1813 // fallthrough to network.
1814 }
1815 }
1816
1817 let server_versions = self.fetch_server_versions(None).await?;
1818 let well_known = self.fetch_client_well_known().await;
1819 let server_info = ServerInfo::new(
1820 server_versions.versions.clone(),
1821 server_versions.unstable_features.clone(),
1822 well_known.map(Into::into),
1823 );
1824
1825 // Attempt to cache the result in storage.
1826 {
1827 if let Err(err) = self
1828 .state_store()
1829 .set_kv_data(
1830 StateStoreDataKey::ServerInfo,
1831 StateStoreDataValue::ServerInfo(server_info.clone()),
1832 )
1833 .await
1834 {
1835 warn!("error when caching server info: {err}");
1836 }
1837 }
1838
1839 Ok(server_info)
1840 }
1841
1842 async fn get_or_load_and_cache_server_info<
1843 Value,
1844 MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
1845 >(
1846 &self,
1847 map: MapFunction,
1848 ) -> HttpResult<Value> {
1849 let server_info = &self.inner.caches.server_info;
1850 if let CachedValue::Cached(val) = map(&*server_info.read().await) {
1851 return Ok(val);
1852 }
1853
1854 let mut guarded_server_info = server_info.write().await;
1855 if let CachedValue::Cached(val) = map(&guarded_server_info) {
1856 return Ok(val);
1857 }
1858
1859 let server_info = self.load_or_fetch_server_info().await?;
1860
1861 // Fill both unstable features and server versions at once.
1862 let mut versions = server_info.known_versions();
1863 if versions.is_empty() {
1864 versions.push(MatrixVersion::V1_0);
1865 }
1866
1867 guarded_server_info.server_versions = CachedValue::Cached(versions.into());
1868 guarded_server_info.unstable_features = CachedValue::Cached(server_info.unstable_features);
1869 guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
1870
1871 // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
1872 // fetch an optional property), the function will always return some.
1873 Ok(map(&guarded_server_info).unwrap_cached_value())
1874 }
1875
1876 /// Get the Matrix versions supported by the homeserver by fetching them
1877 /// from the server or the cache.
1878 ///
1879 /// # Examples
1880 ///
1881 /// ```no_run
1882 /// use ruma::api::MatrixVersion;
1883 /// # use matrix_sdk::{Client, config::SyncSettings};
1884 /// # use url::Url;
1885 /// # async {
1886 /// # let homeserver = Url::parse("http://localhost:8080")?;
1887 /// # let mut client = Client::new(homeserver).await?;
1888 ///
1889 /// let server_versions = client.server_versions().await?;
1890 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
1891 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
1892 /// # anyhow::Ok(()) };
1893 /// ```
1894 pub async fn server_versions(&self) -> HttpResult<Box<[MatrixVersion]>> {
1895 self.get_or_load_and_cache_server_info(|server_info| server_info.server_versions.clone())
1896 .await
1897 }
1898
1899 /// Get the unstable features supported by the homeserver by fetching them
1900 /// from the server or the cache.
1901 ///
1902 /// # Examples
1903 ///
1904 /// ```no_run
1905 /// # use matrix_sdk::{Client, config::SyncSettings};
1906 /// # use url::Url;
1907 /// # async {
1908 /// # let homeserver = Url::parse("http://localhost:8080")?;
1909 /// # let mut client = Client::new(homeserver).await?;
1910 /// let unstable_features = client.unstable_features().await?;
1911 /// let supports_msc_x =
1912 /// unstable_features.get("msc_x").copied().unwrap_or(false);
1913 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
1914 /// # anyhow::Ok(()) };
1915 /// ```
1916 pub async fn unstable_features(&self) -> HttpResult<BTreeMap<String, bool>> {
1917 self.get_or_load_and_cache_server_info(|server_info| server_info.unstable_features.clone())
1918 .await
1919 }
1920
1921 /// Get information about the homeserver's advertised RTC foci by fetching
1922 /// the well-known file from the server or the cache.
1923 ///
1924 /// # Examples
1925 /// ```no_run
1926 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
1927 /// # use url::Url;
1928 /// # async {
1929 /// # let homeserver = Url::parse("http://localhost:8080")?;
1930 /// # let mut client = Client::new(homeserver).await?;
1931 /// let rtc_foci = client.rtc_foci().await?;
1932 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
1933 /// RtcFocusInfo::LiveKit(info) => Some(info),
1934 /// _ => None,
1935 /// });
1936 /// if let Some(info) = default_livekit_focus_info {
1937 /// println!("Default LiveKit service URL: {}", info.service_url);
1938 /// }
1939 /// # anyhow::Ok(()) };
1940 /// ```
1941 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
1942 let well_known = self
1943 .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
1944 .await?;
1945
1946 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
1947 }
1948
1949 /// Empty the server version and unstable features cache.
1950 ///
1951 /// Since the SDK caches server info (versions, unstable features,
1952 /// well-known etc), it's possible to have a stale entry in the cache. This
1953 /// functions makes it possible to force reset it.
1954 pub async fn reset_server_info(&self) -> Result<()> {
1955 // Empty the in-memory caches.
1956 let mut guard = self.inner.caches.server_info.write().await;
1957 guard.server_versions = CachedValue::NotSet;
1958 guard.unstable_features = CachedValue::NotSet;
1959
1960 // Empty the store cache.
1961 Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
1962 }
1963
1964 /// Check whether MSC 4028 is enabled on the homeserver.
1965 ///
1966 /// # Examples
1967 ///
1968 /// ```no_run
1969 /// # use matrix_sdk::{Client, config::SyncSettings};
1970 /// # use url::Url;
1971 /// # async {
1972 /// # let homeserver = Url::parse("http://localhost:8080")?;
1973 /// # let mut client = Client::new(homeserver).await?;
1974 /// let msc4028_enabled =
1975 /// client.can_homeserver_push_encrypted_event_to_device().await?;
1976 /// # anyhow::Ok(()) };
1977 /// ```
1978 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
1979 Ok(self.unstable_features().await?.get("org.matrix.msc4028").copied().unwrap_or(false))
1980 }
1981
1982 /// Get information of all our own devices.
1983 ///
1984 /// # Examples
1985 ///
1986 /// ```no_run
1987 /// # use matrix_sdk::{Client, config::SyncSettings};
1988 /// # use url::Url;
1989 /// # async {
1990 /// # let homeserver = Url::parse("http://localhost:8080")?;
1991 /// # let mut client = Client::new(homeserver).await?;
1992 /// let response = client.devices().await?;
1993 ///
1994 /// for device in response.devices {
1995 /// println!(
1996 /// "Device: {} {}",
1997 /// device.device_id,
1998 /// device.display_name.as_deref().unwrap_or("")
1999 /// );
2000 /// }
2001 /// # anyhow::Ok(()) };
2002 /// ```
2003 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2004 let request = get_devices::v3::Request::new();
2005
2006 self.send(request).await
2007 }
2008
2009 /// Delete the given devices from the server.
2010 ///
2011 /// # Arguments
2012 ///
2013 /// * `devices` - The list of devices that should be deleted from the
2014 /// server.
2015 ///
2016 /// * `auth_data` - This request requires user interactive auth, the first
2017 /// request needs to set this to `None` and will always fail with an
2018 /// `UiaaResponse`. The response will contain information for the
2019 /// interactive auth and the same request needs to be made but this time
2020 /// with some `auth_data` provided.
2021 ///
2022 /// ```no_run
2023 /// # use matrix_sdk::{
2024 /// # ruma::{api::client::uiaa, device_id},
2025 /// # Client, Error, config::SyncSettings,
2026 /// # };
2027 /// # use serde_json::json;
2028 /// # use url::Url;
2029 /// # use std::collections::BTreeMap;
2030 /// # async {
2031 /// # let homeserver = Url::parse("http://localhost:8080")?;
2032 /// # let mut client = Client::new(homeserver).await?;
2033 /// let devices = &[device_id!("DEVICEID").to_owned()];
2034 ///
2035 /// if let Err(e) = client.delete_devices(devices, None).await {
2036 /// if let Some(info) = e.as_uiaa_response() {
2037 /// let mut password = uiaa::Password::new(
2038 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2039 /// "wordpass".to_owned(),
2040 /// );
2041 /// password.session = info.session.clone();
2042 ///
2043 /// client
2044 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2045 /// .await?;
2046 /// }
2047 /// }
2048 /// # anyhow::Ok(()) };
2049 pub async fn delete_devices(
2050 &self,
2051 devices: &[OwnedDeviceId],
2052 auth_data: Option<uiaa::AuthData>,
2053 ) -> HttpResult<delete_devices::v3::Response> {
2054 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2055 request.auth = auth_data;
2056
2057 self.send(request).await
2058 }
2059
2060 /// Change the display name of a device owned by the current user.
2061 ///
2062 /// Returns a `update_device::Response` which specifies the result
2063 /// of the operation.
2064 ///
2065 /// # Arguments
2066 ///
2067 /// * `device_id` - The ID of the device to change the display name of.
2068 /// * `display_name` - The new display name to set.
2069 pub async fn rename_device(
2070 &self,
2071 device_id: &DeviceId,
2072 display_name: &str,
2073 ) -> HttpResult<update_device::v3::Response> {
2074 let mut request = update_device::v3::Request::new(device_id.to_owned());
2075 request.display_name = Some(display_name.to_owned());
2076
2077 self.send(request).await
2078 }
2079
2080 /// Synchronize the client's state with the latest state on the server.
2081 ///
2082 /// ## Syncing Events
2083 ///
2084 /// Messages or any other type of event need to be periodically fetched from
2085 /// the server, this is achieved by sending a `/sync` request to the server.
2086 ///
2087 /// The first sync is sent out without a [`token`]. The response of the
2088 /// first sync will contain a [`next_batch`] field which should then be
2089 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2090 /// don't receive the same events multiple times.
2091 ///
2092 /// ## Long Polling
2093 ///
2094 /// A sync should in the usual case always be in flight. The
2095 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2096 /// long the server will wait for new events before it will respond.
2097 /// The server will respond immediately if some new events arrive before the
2098 /// timeout has expired. If no changes arrive and the timeout expires an
2099 /// empty sync response will be sent to the client.
2100 ///
2101 /// This method of sending a request that may not receive a response
2102 /// immediately is called long polling.
2103 ///
2104 /// ## Filtering Events
2105 ///
2106 /// The number or type of messages and events that the client should receive
2107 /// from the server can be altered using a [`Filter`].
2108 ///
2109 /// Filters can be non-trivial and, since they will be sent with every sync
2110 /// request, they may take up a bunch of unnecessary bandwidth.
2111 ///
2112 /// Luckily filters can be uploaded to the server and reused using an unique
2113 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2114 /// method.
2115 ///
2116 /// # Arguments
2117 ///
2118 /// * `sync_settings` - Settings for the sync call, this allows us to set
2119 /// various options to configure the sync:
2120 /// * [`filter`] - To configure which events we receive and which get
2121 /// [filtered] by the server
2122 /// * [`timeout`] - To configure our [long polling] setup.
2123 /// * [`token`] - To tell the server which events we already received
2124 /// and where we wish to continue syncing.
2125 /// * [`full_state`] - To tell the server that we wish to receive all
2126 /// state events, regardless of our configured [`token`].
2127 /// * [`set_presence`] - To tell the server to set the presence and to
2128 /// which state.
2129 ///
2130 /// # Examples
2131 ///
2132 /// ```no_run
2133 /// # use url::Url;
2134 /// # async {
2135 /// # let homeserver = Url::parse("http://localhost:8080")?;
2136 /// # let username = "";
2137 /// # let password = "";
2138 /// use matrix_sdk::{
2139 /// config::SyncSettings,
2140 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2141 /// };
2142 ///
2143 /// let client = Client::new(homeserver).await?;
2144 /// client.matrix_auth().login_username(username, password).send().await?;
2145 ///
2146 /// // Sync once so we receive the client state and old messages.
2147 /// client.sync_once(SyncSettings::default()).await?;
2148 ///
2149 /// // Register our handler so we start responding once we receive a new
2150 /// // event.
2151 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2152 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2153 /// });
2154 ///
2155 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2156 /// // from our `sync_once()` call automatically.
2157 /// client.sync(SyncSettings::default()).await;
2158 /// # anyhow::Ok(()) };
2159 /// ```
2160 ///
2161 /// [`sync`]: #method.sync
2162 /// [`SyncSettings`]: crate::config::SyncSettings
2163 /// [`token`]: crate::config::SyncSettings#method.token
2164 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2165 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2166 /// [`set_presence`]: ruma::presence::PresenceState
2167 /// [`filter`]: crate::config::SyncSettings#method.filter
2168 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2169 /// [`next_batch`]: SyncResponse#structfield.next_batch
2170 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2171 /// [long polling]: #long-polling
2172 /// [filtered]: #filtering-events
2173 #[instrument(skip(self))]
2174 pub async fn sync_once(
2175 &self,
2176 sync_settings: crate::config::SyncSettings,
2177 ) -> Result<SyncResponse> {
2178 // The sync might not return for quite a while due to the timeout.
2179 // We'll see if there's anything crypto related to send out before we
2180 // sync, i.e. if we closed our client after a sync but before the
2181 // crypto requests were sent out.
2182 //
2183 // This will mostly be a no-op.
2184 #[cfg(feature = "e2e-encryption")]
2185 if let Err(e) = self.send_outgoing_requests().await {
2186 error!(error = ?e, "Error while sending outgoing E2EE requests");
2187 }
2188
2189 let request = assign!(sync_events::v3::Request::new(), {
2190 filter: sync_settings.filter.map(|f| *f),
2191 since: sync_settings.token,
2192 full_state: sync_settings.full_state,
2193 set_presence: sync_settings.set_presence,
2194 timeout: sync_settings.timeout,
2195 });
2196 let mut request_config = self.request_config();
2197 if let Some(timeout) = sync_settings.timeout {
2198 request_config.timeout += timeout;
2199 }
2200
2201 let response = self.send(request).with_request_config(request_config).await?;
2202 let next_batch = response.next_batch.clone();
2203 let response = self.process_sync(response).await?;
2204
2205 #[cfg(feature = "e2e-encryption")]
2206 if let Err(e) = self.send_outgoing_requests().await {
2207 error!(error = ?e, "Error while sending outgoing E2EE requests");
2208 }
2209
2210 self.inner.sync_beat.notify(usize::MAX);
2211
2212 Ok(SyncResponse::new(next_batch, response))
2213 }
2214
2215 /// Repeatedly synchronize the client state with the server.
2216 ///
2217 /// This method will only return on error, if cancellation is needed
2218 /// the method should be wrapped in a cancelable task or the
2219 /// [`Client::sync_with_callback`] method can be used or
2220 /// [`Client::sync_with_result_callback`] if you want to handle error
2221 /// cases in the loop, too.
2222 ///
2223 /// This method will internally call [`Client::sync_once`] in a loop.
2224 ///
2225 /// This method can be used with the [`Client::add_event_handler`]
2226 /// method to react to individual events. If you instead wish to handle
2227 /// events in a bulk manner the [`Client::sync_with_callback`],
2228 /// [`Client::sync_with_result_callback`] and
2229 /// [`Client::sync_stream`] methods can be used instead. Those methods
2230 /// repeatedly return the whole sync response.
2231 ///
2232 /// # Arguments
2233 ///
2234 /// * `sync_settings` - Settings for the sync call. *Note* that those
2235 /// settings will be only used for the first sync call. See the argument
2236 /// docs for [`Client::sync_once`] for more info.
2237 ///
2238 /// # Return
2239 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2240 /// up to the user of the API to check the error and decide whether the sync
2241 /// should continue or not.
2242 ///
2243 /// # Examples
2244 ///
2245 /// ```no_run
2246 /// # use url::Url;
2247 /// # async {
2248 /// # let homeserver = Url::parse("http://localhost:8080")?;
2249 /// # let username = "";
2250 /// # let password = "";
2251 /// use matrix_sdk::{
2252 /// config::SyncSettings,
2253 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2254 /// };
2255 ///
2256 /// let client = Client::new(homeserver).await?;
2257 /// client.matrix_auth().login_username(&username, &password).send().await?;
2258 ///
2259 /// // Register our handler so we start responding once we receive a new
2260 /// // event.
2261 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2262 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2263 /// });
2264 ///
2265 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2266 /// // automatically.
2267 /// client.sync(SyncSettings::default()).await?;
2268 /// # anyhow::Ok(()) };
2269 /// ```
2270 ///
2271 /// [argument docs]: #method.sync_once
2272 /// [`sync_with_callback`]: #method.sync_with_callback
2273 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2274 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2275 }
2276
2277 /// Repeatedly call sync to synchronize the client state with the server.
2278 ///
2279 /// # Arguments
2280 ///
2281 /// * `sync_settings` - Settings for the sync call. *Note* that those
2282 /// settings will be only used for the first sync call. See the argument
2283 /// docs for [`Client::sync_once`] for more info.
2284 ///
2285 /// * `callback` - A callback that will be called every time a successful
2286 /// response has been fetched from the server. The callback must return a
2287 /// boolean which signalizes if the method should stop syncing. If the
2288 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2289 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2290 ///
2291 /// # Return
2292 /// The sync runs until an error occurs or the
2293 /// callback indicates that the Loop should stop. If the callback asked for
2294 /// a regular stop, the result will be `Ok(())` otherwise the
2295 /// `Err(Error)` is returned.
2296 ///
2297 /// # Examples
2298 ///
2299 /// The following example demonstrates how to sync forever while sending all
2300 /// the interesting events through a mpsc channel to another thread e.g. a
2301 /// UI thread.
2302 ///
2303 /// ```no_run
2304 /// # use std::time::Duration;
2305 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2306 /// # use url::Url;
2307 /// # async {
2308 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2309 /// # let mut client = Client::new(homeserver).await.unwrap();
2310 ///
2311 /// use tokio::sync::mpsc::channel;
2312 ///
2313 /// let (tx, rx) = channel(100);
2314 ///
2315 /// let sync_channel = &tx;
2316 /// let sync_settings = SyncSettings::new()
2317 /// .timeout(Duration::from_secs(30));
2318 ///
2319 /// client
2320 /// .sync_with_callback(sync_settings, |response| async move {
2321 /// let channel = sync_channel;
2322 /// for (room_id, room) in response.rooms.joined {
2323 /// for event in room.timeline.events {
2324 /// channel.send(event).await.unwrap();
2325 /// }
2326 /// }
2327 ///
2328 /// LoopCtrl::Continue
2329 /// })
2330 /// .await;
2331 /// };
2332 /// ```
2333 #[instrument(skip_all)]
2334 pub async fn sync_with_callback<C>(
2335 &self,
2336 sync_settings: crate::config::SyncSettings,
2337 callback: impl Fn(SyncResponse) -> C,
2338 ) -> Result<(), Error>
2339 where
2340 C: Future<Output = LoopCtrl>,
2341 {
2342 self.sync_with_result_callback(sync_settings, |result| async {
2343 Ok(callback(result?).await)
2344 })
2345 .await
2346 }
2347
2348 /// Repeatedly call sync to synchronize the client state with the server.
2349 ///
2350 /// # Arguments
2351 ///
2352 /// * `sync_settings` - Settings for the sync call. *Note* that those
2353 /// settings will be only used for the first sync call. See the argument
2354 /// docs for [`Client::sync_once`] for more info.
2355 ///
2356 /// * `callback` - A callback that will be called every time after a
2357 /// response has been received, failure or not. The callback returns a
2358 /// `Result<LoopCtrl, Error>`, too. When returning
2359 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2360 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2361 /// function returns `Ok(())`. In case the callback can't handle the
2362 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2363 /// which results in the sync ending and the `Err(Error)` being returned.
2364 ///
2365 /// # Return
2366 /// The sync runs until an error occurs that the callback can't handle or
2367 /// the callback indicates that the Loop should stop. If the callback
2368 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2369 /// `Err(Error)` is returned.
2370 ///
2371 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2372 /// this, and are handled first without sending the result to the
2373 /// callback. Only after they have exceeded is the `Result` handed to
2374 /// the callback.
2375 ///
2376 /// # Examples
2377 ///
2378 /// The following example demonstrates how to sync forever while sending all
2379 /// the interesting events through a mpsc channel to another thread e.g. a
2380 /// UI thread.
2381 ///
2382 /// ```no_run
2383 /// # use std::time::Duration;
2384 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2385 /// # use url::Url;
2386 /// # async {
2387 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2388 /// # let mut client = Client::new(homeserver).await.unwrap();
2389 /// #
2390 /// use tokio::sync::mpsc::channel;
2391 ///
2392 /// let (tx, rx) = channel(100);
2393 ///
2394 /// let sync_channel = &tx;
2395 /// let sync_settings = SyncSettings::new()
2396 /// .timeout(Duration::from_secs(30));
2397 ///
2398 /// client
2399 /// .sync_with_result_callback(sync_settings, |response| async move {
2400 /// let channel = sync_channel;
2401 /// let sync_response = response?;
2402 /// for (room_id, room) in sync_response.rooms.joined {
2403 /// for event in room.timeline.events {
2404 /// channel.send(event).await.unwrap();
2405 /// }
2406 /// }
2407 ///
2408 /// Ok(LoopCtrl::Continue)
2409 /// })
2410 /// .await;
2411 /// };
2412 /// ```
2413 #[instrument(skip(self, callback))]
2414 pub async fn sync_with_result_callback<C>(
2415 &self,
2416 mut sync_settings: crate::config::SyncSettings,
2417 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2418 ) -> Result<(), Error>
2419 where
2420 C: Future<Output = Result<LoopCtrl, Error>>,
2421 {
2422 let mut last_sync_time: Option<Instant> = None;
2423
2424 if sync_settings.token.is_none() {
2425 sync_settings.token = self.sync_token().await;
2426 }
2427
2428 loop {
2429 trace!("Syncing");
2430 let result = self.sync_loop_helper(&mut sync_settings).await;
2431
2432 trace!("Running callback");
2433 if callback(result).await? == LoopCtrl::Break {
2434 trace!("Callback told us to stop");
2435 break;
2436 }
2437 trace!("Done running callback");
2438
2439 Client::delay_sync(&mut last_sync_time).await
2440 }
2441
2442 Ok(())
2443 }
2444
2445 //// Repeatedly synchronize the client state with the server.
2446 ///
2447 /// This method will internally call [`Client::sync_once`] in a loop and is
2448 /// equivalent to the [`Client::sync`] method but the responses are provided
2449 /// as an async stream.
2450 ///
2451 /// # Arguments
2452 ///
2453 /// * `sync_settings` - Settings for the sync call. *Note* that those
2454 /// settings will be only used for the first sync call. See the argument
2455 /// docs for [`Client::sync_once`] for more info.
2456 ///
2457 /// # Examples
2458 ///
2459 /// ```no_run
2460 /// # use url::Url;
2461 /// # async {
2462 /// # let homeserver = Url::parse("http://localhost:8080")?;
2463 /// # let username = "";
2464 /// # let password = "";
2465 /// use futures_util::StreamExt;
2466 /// use matrix_sdk::{config::SyncSettings, Client};
2467 ///
2468 /// let client = Client::new(homeserver).await?;
2469 /// client.matrix_auth().login_username(&username, &password).send().await?;
2470 ///
2471 /// let mut sync_stream =
2472 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2473 ///
2474 /// while let Some(Ok(response)) = sync_stream.next().await {
2475 /// for room in response.rooms.joined.values() {
2476 /// for e in &room.timeline.events {
2477 /// if let Ok(event) = e.raw().deserialize() {
2478 /// println!("Received event {:?}", event);
2479 /// }
2480 /// }
2481 /// }
2482 /// }
2483 ///
2484 /// # anyhow::Ok(()) };
2485 /// ```
2486 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2487 #[instrument(skip(self))]
2488 pub async fn sync_stream(
2489 &self,
2490 mut sync_settings: crate::config::SyncSettings,
2491 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2492 let mut last_sync_time: Option<Instant> = None;
2493
2494 if sync_settings.token.is_none() {
2495 sync_settings.token = self.sync_token().await;
2496 }
2497
2498 let parent_span = Span::current();
2499
2500 async_stream::stream! {
2501 loop {
2502 yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;
2503
2504 Client::delay_sync(&mut last_sync_time).await
2505 }
2506 }
2507 }
2508
2509 /// Get the current, if any, sync token of the client.
2510 /// This will be None if the client didn't sync at least once.
2511 pub(crate) async fn sync_token(&self) -> Option<String> {
2512 self.inner.base_client.sync_token().await
2513 }
2514
2515 /// Gets information about the owner of a given access token.
2516 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2517 let request = whoami::v3::Request::new();
2518 self.send(request).await
2519 }
2520
2521 /// Subscribes a new receiver to client SessionChange broadcasts.
2522 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2523 let broadcast = &self.auth_ctx().session_change_sender;
2524 broadcast.subscribe()
2525 }
2526
2527 /// Sets the save/restore session callbacks.
2528 ///
2529 /// This is another mechanism to get synchronous updates to session tokens,
2530 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2531 pub fn set_session_callbacks(
2532 &self,
2533 reload_session_callback: Box<ReloadSessionCallback>,
2534 save_session_callback: Box<SaveSessionCallback>,
2535 ) -> Result<()> {
2536 self.inner
2537 .auth_ctx
2538 .reload_session_callback
2539 .set(reload_session_callback)
2540 .map_err(|_| Error::MultipleSessionCallbacks)?;
2541
2542 self.inner
2543 .auth_ctx
2544 .save_session_callback
2545 .set(save_session_callback)
2546 .map_err(|_| Error::MultipleSessionCallbacks)?;
2547
2548 Ok(())
2549 }
2550
2551 /// Get the notification settings of the current owner of the client.
2552 pub async fn notification_settings(&self) -> NotificationSettings {
2553 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2554 NotificationSettings::new(self.clone(), ruleset)
2555 }
2556
2557 /// Create a new specialized `Client` that can process notifications.
2558 ///
2559 /// See [`CrossProcessStoreLock::new`] to learn more about
2560 /// `cross_process_store_locks_holder_name`.
2561 ///
2562 /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2563 pub async fn notification_client(
2564 &self,
2565 cross_process_store_locks_holder_name: String,
2566 ) -> Result<Client> {
2567 let client = Client {
2568 inner: ClientInner::new(
2569 self.inner.auth_ctx.clone(),
2570 self.server().cloned(),
2571 self.homeserver(),
2572 self.sliding_sync_version(),
2573 self.inner.http_client.clone(),
2574 self.inner
2575 .base_client
2576 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2577 .await?,
2578 self.inner.caches.server_info.read().await.clone(),
2579 self.inner.respect_login_well_known,
2580 self.inner.event_cache.clone(),
2581 self.inner.send_queue_data.clone(),
2582 #[cfg(feature = "e2e-encryption")]
2583 self.inner.e2ee.encryption_settings,
2584 #[cfg(feature = "e2e-encryption")]
2585 self.inner.enable_share_history_on_invite,
2586 cross_process_store_locks_holder_name,
2587 )
2588 .await,
2589 };
2590
2591 Ok(client)
2592 }
2593
2594 /// The [`EventCache`] instance for this [`Client`].
2595 pub fn event_cache(&self) -> &EventCache {
2596 // SAFETY: always initialized in the `Client` ctor.
2597 self.inner.event_cache.get().unwrap()
2598 }
2599
2600 /// Waits until an at least partially synced room is received, and returns
2601 /// it.
2602 ///
2603 /// **Note: this function will loop endlessly until either it finds the room
2604 /// or an externally set timeout happens.**
2605 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2606 loop {
2607 if let Some(room) = self.get_room(room_id) {
2608 if room.is_state_partially_or_fully_synced() {
2609 debug!("Found just created room!");
2610 return room;
2611 }
2612 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2613 } else {
2614 debug!("Room wasn't found, waiting for sync beat to try again");
2615 }
2616 self.inner.sync_beat.listen().await;
2617 }
2618 }
2619
2620 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2621 /// join it.
2622 pub async fn knock(
2623 &self,
2624 room_id_or_alias: OwnedRoomOrAliasId,
2625 reason: Option<String>,
2626 server_names: Vec<OwnedServerName>,
2627 ) -> Result<Room> {
2628 let request =
2629 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2630 let response = self.send(request).await?;
2631 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2632 Ok(Room::new(self.clone(), base_room))
2633 }
2634
2635 /// Checks whether the provided `user_id` belongs to an ignored user.
2636 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2637 self.base_client().is_user_ignored(user_id).await
2638 }
2639
2640 /// Gets the `max_upload_size` value from the homeserver, getting either a
2641 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2642 /// missing.
2643 ///
2644 /// Check the spec for more info:
2645 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2646 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2647 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2648 if let Some(data) = max_upload_size_lock.get() {
2649 return Ok(data.to_owned());
2650 }
2651
2652 let response = self
2653 .send(ruma::api::client::authenticated_media::get_media_config::v1::Request::default())
2654 .await?;
2655
2656 match max_upload_size_lock.set(response.upload_size) {
2657 Ok(_) => Ok(response.upload_size),
2658 Err(error) => {
2659 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2660 }
2661 }
2662 }
2663}
2664
2665/// A weak reference to the inner client, useful when trying to get a handle
2666/// on the owning client.
2667#[derive(Clone)]
2668pub(crate) struct WeakClient {
2669 client: Weak<ClientInner>,
2670}
2671
2672impl WeakClient {
2673 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2674 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2675 Self { client: Arc::downgrade(client) }
2676 }
2677
2678 /// Construct a [`WeakClient`] from a [`Client`].
2679 pub fn from_client(client: &Client) -> Self {
2680 Self::from_inner(&client.inner)
2681 }
2682
2683 /// Attempts to get a [`Client`] from this [`WeakClient`].
2684 pub fn get(&self) -> Option<Client> {
2685 self.client.upgrade().map(|inner| Client { inner })
2686 }
2687
2688 /// Gets the number of strong (`Arc`) pointers still pointing to this
2689 /// client.
2690 #[allow(dead_code)]
2691 pub fn strong_count(&self) -> usize {
2692 self.client.strong_count()
2693 }
2694}
2695
2696#[derive(Clone)]
2697struct ClientServerInfo {
2698 /// The Matrix versions the server supports (known ones only).
2699 server_versions: CachedValue<Box<[MatrixVersion]>>,
2700
2701 /// The unstable features and their on/off state on the server.
2702 unstable_features: CachedValue<BTreeMap<String, bool>>,
2703
2704 /// The server's well-known file, if any.
2705 well_known: CachedValue<Option<WellKnownResponse>>,
2706}
2707
2708/// A cached value that can either be set or not set, used to avoid confusion
2709/// between a value that is set to `None` (because it doesn't exist) and a value
2710/// that has not been cached yet.
2711#[derive(Clone)]
2712enum CachedValue<Value> {
2713 /// A value has been cached.
2714 Cached(Value),
2715 /// Nothing has been cached yet.
2716 NotSet,
2717}
2718
2719impl<Value> CachedValue<Value> {
2720 /// Unwraps the cached value, returning it if it exists.
2721 ///
2722 /// # Panics
2723 ///
2724 /// If the cached value is not set, this will panic.
2725 fn unwrap_cached_value(self) -> Value {
2726 match self {
2727 CachedValue::Cached(value) => value,
2728 CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
2729 }
2730 }
2731}
2732
2733// The http mocking library is not supported for wasm32
2734#[cfg(all(test, not(target_family = "wasm")))]
2735pub(crate) mod tests {
2736 use std::{sync::Arc, time::Duration};
2737
2738 use assert_matches::assert_matches;
2739 use assert_matches2::assert_let;
2740 use eyeball::SharedObservable;
2741 use futures_util::{pin_mut, FutureExt};
2742 use js_int::{uint, UInt};
2743 use matrix_sdk_base::{
2744 store::{MemoryStore, StoreConfig},
2745 RoomState,
2746 };
2747 use matrix_sdk_test::{
2748 async_test, test_json, GlobalAccountDataTestEvent, JoinedRoomBuilder, StateTestEvent,
2749 SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
2750 };
2751 #[cfg(target_family = "wasm")]
2752 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2753
2754 use ruma::{
2755 api::{
2756 client::{
2757 discovery::discover_homeserver::RtcFocusInfo,
2758 room::create_room::v3::Request as CreateRoomRequest,
2759 },
2760 MatrixVersion,
2761 },
2762 assign,
2763 events::{
2764 ignored_user_list::IgnoredUserListEventContent,
2765 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
2766 },
2767 owned_room_id, room_alias_id, room_id, RoomId, ServerName, UserId,
2768 };
2769 use serde_json::json;
2770 use stream_assert::{assert_next_matches, assert_pending};
2771 use tokio::{
2772 spawn,
2773 time::{sleep, timeout},
2774 };
2775 use url::Url;
2776 use wiremock::{
2777 matchers::{body_json, header, method, path, query_param_is_missing},
2778 Mock, MockServer, ResponseTemplate,
2779 };
2780
2781 use super::Client;
2782 use crate::{
2783 client::{futures::SendMediaUploadRequest, WeakClient},
2784 config::{RequestConfig, SyncSettings},
2785 futures::SendRequest,
2786 media::MediaError,
2787 test_utils::{
2788 logged_in_client, mocks::MatrixMockServer, no_retry_test_client, set_client_session,
2789 test_client_builder, test_client_builder_with_server,
2790 },
2791 Error, TransmissionProgress,
2792 };
2793
2794 #[async_test]
2795 async fn test_account_data() {
2796 let server = MockServer::start().await;
2797 let client = logged_in_client(Some(server.uri())).await;
2798
2799 Mock::given(method("GET"))
2800 .and(path("/_matrix/client/r0/sync".to_owned()))
2801 .and(header("authorization", "Bearer 1234"))
2802 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::SYNC))
2803 .mount(&server)
2804 .await;
2805
2806 let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
2807 let _response = client.sync_once(sync_settings).await.unwrap();
2808
2809 let content = client
2810 .account()
2811 .account_data::<IgnoredUserListEventContent>()
2812 .await
2813 .unwrap()
2814 .unwrap()
2815 .deserialize()
2816 .unwrap();
2817
2818 assert_eq!(content.ignored_users.len(), 1);
2819 }
2820
2821 #[async_test]
2822 async fn test_successful_discovery() {
2823 // Imagine this is `matrix.org`.
2824 let server = MockServer::start().await;
2825
2826 // Imagine this is `matrix-client.matrix.org`.
2827 let homeserver = MockServer::start().await;
2828
2829 // Imagine Alice has the user ID `@alice:matrix.org`.
2830 let server_url = server.uri();
2831 let domain = server_url.strip_prefix("http://").unwrap();
2832 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2833
2834 // The `.well-known` is on the server (e.g. `matrix.org`).
2835 Mock::given(method("GET"))
2836 .and(path("/.well-known/matrix/client"))
2837 .respond_with(ResponseTemplate::new(200).set_body_raw(
2838 test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", &homeserver.uri()),
2839 "application/json",
2840 ))
2841 .mount(&server)
2842 .await;
2843
2844 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
2845 Mock::given(method("GET"))
2846 .and(path("/_matrix/client/versions"))
2847 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
2848 .mount(&homeserver)
2849 .await;
2850
2851 let client = Client::builder()
2852 .insecure_server_name_no_tls(alice.server_name())
2853 .build()
2854 .await
2855 .unwrap();
2856
2857 assert_eq!(client.server().unwrap(), &Url::parse(&server.uri()).unwrap());
2858 assert_eq!(client.homeserver(), Url::parse(&homeserver.uri()).unwrap());
2859 }
2860
2861 #[async_test]
2862 async fn test_discovery_broken_server() {
2863 let server = MockServer::start().await;
2864 let server_url = server.uri();
2865 let domain = server_url.strip_prefix("http://").unwrap();
2866 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2867
2868 Mock::given(method("GET"))
2869 .and(path("/.well-known/matrix/client"))
2870 .respond_with(ResponseTemplate::new(404))
2871 .mount(&server)
2872 .await;
2873
2874 assert!(
2875 Client::builder()
2876 .insecure_server_name_no_tls(alice.server_name())
2877 .build()
2878 .await
2879 .is_err(),
2880 "Creating a client from a user ID should fail when the .well-known request fails."
2881 );
2882 }
2883
2884 #[async_test]
2885 async fn test_room_creation() {
2886 let server = MockServer::start().await;
2887 let client = logged_in_client(Some(server.uri())).await;
2888
2889 let response = SyncResponseBuilder::default()
2890 .add_joined_room(
2891 JoinedRoomBuilder::default()
2892 .add_state_event(StateTestEvent::Member)
2893 .add_state_event(StateTestEvent::PowerLevels),
2894 )
2895 .build_sync_response();
2896
2897 client.inner.base_client.receive_sync_response(response).await.unwrap();
2898
2899 assert_eq!(client.homeserver(), Url::parse(&server.uri()).unwrap());
2900
2901 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
2902 assert_eq!(room.state(), RoomState::Joined);
2903 }
2904
2905 #[async_test]
2906 async fn test_retry_limit_http_requests() {
2907 let server = MockServer::start().await;
2908 let client = test_client_builder(Some(server.uri()))
2909 .request_config(RequestConfig::new().retry_limit(3))
2910 .build()
2911 .await
2912 .unwrap();
2913
2914 assert!(client.request_config().retry_limit.unwrap() == 3);
2915
2916 Mock::given(method("POST"))
2917 .and(path("/_matrix/client/r0/login"))
2918 .respond_with(ResponseTemplate::new(501))
2919 .expect(3)
2920 .mount(&server)
2921 .await;
2922
2923 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2924 }
2925
2926 #[async_test]
2927 async fn test_retry_timeout_http_requests() {
2928 // Keep this timeout small so that the test doesn't take long
2929 let retry_timeout = Duration::from_secs(5);
2930 let server = MockServer::start().await;
2931 let client = test_client_builder(Some(server.uri()))
2932 .request_config(RequestConfig::new().max_retry_time(retry_timeout))
2933 .build()
2934 .await
2935 .unwrap();
2936
2937 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
2938
2939 Mock::given(method("POST"))
2940 .and(path("/_matrix/client/r0/login"))
2941 .respond_with(ResponseTemplate::new(501))
2942 .expect(2..)
2943 .mount(&server)
2944 .await;
2945
2946 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2947 }
2948
2949 #[async_test]
2950 async fn test_short_retry_initial_http_requests() {
2951 let server = MockServer::start().await;
2952 let client = test_client_builder(Some(server.uri())).build().await.unwrap();
2953
2954 Mock::given(method("POST"))
2955 .and(path("/_matrix/client/r0/login"))
2956 .respond_with(ResponseTemplate::new(501))
2957 .expect(3..)
2958 .mount(&server)
2959 .await;
2960
2961 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2962 }
2963
2964 #[async_test]
2965 async fn test_no_retry_http_requests() {
2966 let server = MockServer::start().await;
2967 let client = logged_in_client(Some(server.uri())).await;
2968
2969 Mock::given(method("GET"))
2970 .and(path("/_matrix/client/r0/devices"))
2971 .respond_with(ResponseTemplate::new(501))
2972 .expect(1)
2973 .mount(&server)
2974 .await;
2975
2976 client.devices().await.unwrap_err();
2977 }
2978
2979 #[async_test]
2980 async fn test_set_homeserver() {
2981 let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2982 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
2983
2984 let homeserver = Url::parse("http://example.com/").unwrap();
2985 client.set_homeserver(homeserver.clone());
2986 assert_eq!(client.homeserver(), homeserver);
2987 }
2988
2989 #[async_test]
2990 async fn test_search_user_request() {
2991 let server = MockServer::start().await;
2992 let client = logged_in_client(Some(server.uri())).await;
2993
2994 Mock::given(method("POST"))
2995 .and(path("_matrix/client/r0/user_directory/search"))
2996 .and(body_json(&*test_json::search_users::SEARCH_USERS_REQUEST))
2997 .respond_with(
2998 ResponseTemplate::new(200)
2999 .set_body_json(&*test_json::search_users::SEARCH_USERS_RESPONSE),
3000 )
3001 .mount(&server)
3002 .await;
3003
3004 let response = client.search_users("test", 50).await.unwrap();
3005 assert_eq!(response.results.len(), 1);
3006 let result = &response.results[0];
3007 assert_eq!(result.user_id.to_string(), "@test:example.me");
3008 assert_eq!(result.display_name.clone().unwrap(), "Test");
3009 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3010 assert!(!response.limited);
3011 }
3012
3013 #[async_test]
3014 async fn test_request_unstable_features() {
3015 let server = MockServer::start().await;
3016 let client = logged_in_client(Some(server.uri())).await;
3017
3018 Mock::given(method("GET"))
3019 .and(path("_matrix/client/versions"))
3020 .respond_with(
3021 ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
3022 )
3023 .mount(&server)
3024 .await;
3025
3026 let unstable_features = client.unstable_features().await.unwrap();
3027 assert_eq!(unstable_features.get("org.matrix.e2e_cross_signing"), Some(&true));
3028 assert_eq!(unstable_features.get("you.shall.pass"), None);
3029 }
3030
3031 #[async_test]
3032 async fn test_can_homeserver_push_encrypted_event_to_device() {
3033 let server = MockServer::start().await;
3034 let client = logged_in_client(Some(server.uri())).await;
3035
3036 Mock::given(method("GET"))
3037 .and(path("_matrix/client/versions"))
3038 .respond_with(
3039 ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
3040 )
3041 .mount(&server)
3042 .await;
3043
3044 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3045 assert!(msc4028_enabled);
3046 }
3047
3048 #[async_test]
3049 async fn test_recently_visited_rooms() {
3050 // Tracking recently visited rooms requires authentication
3051 let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
3052 assert_matches!(
3053 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3054 Err(Error::AuthenticationRequired)
3055 );
3056
3057 let client = logged_in_client(None).await;
3058 let account = client.account();
3059
3060 // We should start off with an empty list
3061 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3062
3063 // Tracking a valid room id should add it to the list
3064 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3065 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3066 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3067
3068 // And the existing list shouldn't be changed
3069 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3070 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3071
3072 // Tracking the same room again shouldn't change the list
3073 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3074 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3075 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3076
3077 // Tracking a second room should add it to the front of the list
3078 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3079 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3080 assert_eq!(
3081 account.get_recently_visited_rooms().await.unwrap(),
3082 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3083 );
3084
3085 // Tracking the first room yet again should move it to the front of the list
3086 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3087 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3088 assert_eq!(
3089 account.get_recently_visited_rooms().await.unwrap(),
3090 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3091 );
3092
3093 // Tracking should be capped at 20
3094 for n in 0..20 {
3095 account
3096 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3097 .await
3098 .unwrap();
3099 }
3100
3101 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3102
3103 // And the initial rooms should've been pushed out
3104 let rooms = account.get_recently_visited_rooms().await.unwrap();
3105 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3106 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3107
3108 // And the last tracked room should be the first
3109 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3110 }
3111
3112 #[async_test]
3113 async fn test_client_no_cycle_with_event_cache() {
3114 let client = logged_in_client(None).await;
3115
3116 // Wait for the init tasks to die.
3117 sleep(Duration::from_secs(1)).await;
3118
3119 let weak_client = WeakClient::from_client(&client);
3120 assert_eq!(weak_client.strong_count(), 1);
3121
3122 {
3123 let room_id = room_id!("!room:example.org");
3124
3125 // Have the client know the room.
3126 let response = SyncResponseBuilder::default()
3127 .add_joined_room(JoinedRoomBuilder::new(room_id))
3128 .build_sync_response();
3129 client.inner.base_client.receive_sync_response(response).await.unwrap();
3130
3131 client.event_cache().subscribe().unwrap();
3132
3133 let (_room_event_cache, _drop_handles) =
3134 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3135 }
3136
3137 drop(client);
3138
3139 // Give a bit of time for background tasks to die.
3140 sleep(Duration::from_secs(1)).await;
3141
3142 // The weak client must be the last reference to the client now.
3143 assert_eq!(weak_client.strong_count(), 0);
3144 let client = weak_client.get();
3145 assert!(
3146 client.is_none(),
3147 "too many strong references to the client: {}",
3148 Arc::strong_count(&client.unwrap().inner)
3149 );
3150 }
3151
3152 #[async_test]
3153 async fn test_server_info_caching() {
3154 let server = MockServer::start().await;
3155 let server_url = server.uri();
3156 let domain = server_url.strip_prefix("http://").unwrap();
3157 let server_name = <&ServerName>::try_from(domain).unwrap();
3158 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3159
3160 let well_known_mock = Mock::given(method("GET"))
3161 .and(path("/.well-known/matrix/client"))
3162 .respond_with(ResponseTemplate::new(200).set_body_raw(
3163 test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", server_url.as_ref()),
3164 "application/json",
3165 ))
3166 .named("well known mock")
3167 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3168 .mount_as_scoped(&server)
3169 .await;
3170
3171 let versions_mock = Mock::given(method("GET"))
3172 .and(path("/_matrix/client/versions"))
3173 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3174 .named("first versions mock")
3175 .expect(1)
3176 .mount_as_scoped(&server)
3177 .await;
3178
3179 let memory_store = Arc::new(MemoryStore::new());
3180 let client = Client::builder()
3181 .insecure_server_name_no_tls(server_name)
3182 .store_config(
3183 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3184 .state_store(memory_store.clone()),
3185 )
3186 .build()
3187 .await
3188 .unwrap();
3189
3190 assert_eq!(client.server_versions().await.unwrap().len(), 1);
3191
3192 // These subsequent calls hit the in-memory cache.
3193 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3194 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3195
3196 drop(client);
3197
3198 let client = Client::builder()
3199 .homeserver_url(server.uri()) // Configure this client directly so as to not hit the discovery endpoint.
3200 .store_config(
3201 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3202 .state_store(memory_store.clone()),
3203 )
3204 .build()
3205 .await
3206 .unwrap();
3207
3208 // This call to the new client hits the on-disk cache.
3209 assert_eq!(
3210 client.unstable_features().await.unwrap().get("org.matrix.e2e_cross_signing"),
3211 Some(&true)
3212 );
3213
3214 // Then this call hits the in-memory cache.
3215 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3216
3217 drop(versions_mock);
3218 drop(well_known_mock);
3219 server.verify().await;
3220
3221 // Now, reset the cache, and observe the endpoints being called again once.
3222 client.reset_server_info().await.unwrap();
3223
3224 Mock::given(method("GET"))
3225 .and(path("/.well-known/matrix/client"))
3226 .respond_with(ResponseTemplate::new(200).set_body_raw(
3227 test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", server_url.as_ref()),
3228 "application/json",
3229 ))
3230 .named("second well known mock")
3231 .expect(1)
3232 .mount(&server)
3233 .await;
3234
3235 Mock::given(method("GET"))
3236 .and(path("/_matrix/client/versions"))
3237 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3238 .expect(1)
3239 .named("second versions mock")
3240 .mount(&server)
3241 .await;
3242
3243 // Hits network again.
3244 assert_eq!(client.server_versions().await.unwrap().len(), 1);
3245 // Hits in-memory cache again.
3246 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3247 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3248 }
3249
3250 #[async_test]
3251 async fn test_server_info_without_a_well_known() {
3252 let server = MockServer::start().await;
3253 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3254
3255 let versions_mock = Mock::given(method("GET"))
3256 .and(path("/_matrix/client/versions"))
3257 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3258 .named("first versions mock")
3259 .expect(1)
3260 .mount_as_scoped(&server)
3261 .await;
3262
3263 let memory_store = Arc::new(MemoryStore::new());
3264 let client = Client::builder()
3265 .homeserver_url(server.uri()) // Configure this client directly so as to not hit the discovery endpoint.
3266 .store_config(
3267 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3268 .state_store(memory_store.clone()),
3269 )
3270 .build()
3271 .await
3272 .unwrap();
3273
3274 assert_eq!(client.server_versions().await.unwrap().len(), 1);
3275
3276 // These subsequent calls hit the in-memory cache.
3277 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3278 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3279
3280 drop(client);
3281
3282 let client = Client::builder()
3283 .homeserver_url(server.uri()) // Configure this client directly so as to not hit the discovery endpoint.
3284 .store_config(
3285 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3286 .state_store(memory_store.clone()),
3287 )
3288 .build()
3289 .await
3290 .unwrap();
3291
3292 // This call to the new client hits the on-disk cache.
3293 assert_eq!(
3294 client.unstable_features().await.unwrap().get("org.matrix.e2e_cross_signing"),
3295 Some(&true)
3296 );
3297
3298 // Then this call hits the in-memory cache.
3299 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3300
3301 drop(versions_mock);
3302 server.verify().await;
3303
3304 // Now, reset the cache, and observe the endpoints being called again once.
3305 client.reset_server_info().await.unwrap();
3306
3307 Mock::given(method("GET"))
3308 .and(path("/_matrix/client/versions"))
3309 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3310 .expect(1)
3311 .named("second versions mock")
3312 .mount(&server)
3313 .await;
3314
3315 // Hits network again.
3316 assert_eq!(client.server_versions().await.unwrap().len(), 1);
3317 // Hits in-memory cache again.
3318 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3319 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3320 }
3321
3322 #[async_test]
3323 async fn test_no_network_doesnt_cause_infinite_retries() {
3324 // Note: not `no_retry_test_client` or `logged_in_client` which uses the former,
3325 // since we want infinite retries for transient errors.
3326 let client =
3327 test_client_builder(None).request_config(RequestConfig::new()).build().await.unwrap();
3328 set_client_session(&client).await;
3329
3330 // We don't define a mock server on purpose here, so that the error is really a
3331 // network error.
3332 client.whoami().await.unwrap_err();
3333 }
3334
3335 #[async_test]
3336 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3337 let (client_builder, server) = test_client_builder_with_server().await;
3338 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3339 set_client_session(&client).await;
3340
3341 let builder = Mock::given(method("GET"))
3342 .and(path("/_matrix/client/r0/sync"))
3343 .and(header("authorization", "Bearer 1234"))
3344 .and(query_param_is_missing("since"));
3345
3346 let room_id = room_id!("!room:example.org");
3347 let joined_room_builder = JoinedRoomBuilder::new(room_id);
3348 let mut sync_response_builder = SyncResponseBuilder::new();
3349 sync_response_builder.add_joined_room(joined_room_builder);
3350 let response_body = sync_response_builder.build_json_sync_response();
3351
3352 builder
3353 .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3354 .mount(&server)
3355 .await;
3356
3357 client.sync_once(SyncSettings::default()).await.unwrap();
3358
3359 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3360 assert_eq!(room.room_id(), room_id);
3361 }
3362
3363 #[async_test]
3364 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3365 let (client_builder, server) = test_client_builder_with_server().await;
3366 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3367 set_client_session(&client).await;
3368
3369 let builder = Mock::given(method("GET"))
3370 .and(path("/_matrix/client/r0/sync"))
3371 .and(header("authorization", "Bearer 1234"))
3372 .and(query_param_is_missing("since"));
3373
3374 let room_id = room_id!("!room:example.org");
3375 let joined_room_builder = JoinedRoomBuilder::new(room_id);
3376 let mut sync_response_builder = SyncResponseBuilder::new();
3377 sync_response_builder.add_joined_room(joined_room_builder);
3378 let response_body = sync_response_builder.build_json_sync_response();
3379
3380 builder
3381 .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3382 .mount(&server)
3383 .await;
3384
3385 let client = Arc::new(client);
3386
3387 // Perform the /sync request with a delay so it starts after the
3388 // `await_room_remote_echo` call has happened
3389 spawn({
3390 let client = client.clone();
3391 async move {
3392 sleep(Duration::from_millis(100)).await;
3393 client.sync_once(SyncSettings::default()).await.unwrap();
3394 }
3395 });
3396
3397 let room =
3398 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3399 assert_eq!(room.room_id(), room_id);
3400 }
3401
3402 #[async_test]
3403 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3404 let (client_builder, _) = test_client_builder_with_server().await;
3405 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3406 set_client_session(&client).await;
3407
3408 let room_id = room_id!("!room:example.org");
3409 // Room is not present so the client won't be able to find it. The call will
3410 // timeout.
3411 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3412 }
3413
3414 #[async_test]
3415 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3416 let (client_builder, server) = test_client_builder_with_server().await;
3417 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3418 set_client_session(&client).await;
3419
3420 Mock::given(method("POST"))
3421 .and(path("_matrix/client/r0/createRoom"))
3422 .and(header("authorization", "Bearer 1234"))
3423 .respond_with(
3424 ResponseTemplate::new(200).set_body_json(json!({ "room_id": "!room:example.org"})),
3425 )
3426 .mount(&server)
3427 .await;
3428
3429 // Create a room in the internal store
3430 let room = client
3431 .create_room(assign!(CreateRoomRequest::new(), {
3432 invite: vec![],
3433 is_direct: false,
3434 }))
3435 .await
3436 .unwrap();
3437
3438 // Room is locally present, but not synced, the call will timeout
3439 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3440 .await
3441 .unwrap_err();
3442 }
3443
3444 #[async_test]
3445 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3446 let server = MatrixMockServer::new().await;
3447 let client = server.client_builder().build().await;
3448
3449 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3450
3451 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3452 assert_matches!(ret, Ok(true));
3453 }
3454
3455 #[async_test]
3456 async fn test_is_room_alias_available_if_alias_is_resolved() {
3457 let server = MatrixMockServer::new().await;
3458 let client = server.client_builder().build().await;
3459
3460 server
3461 .mock_room_directory_resolve_alias()
3462 .ok("!some_room_id:matrix.org", Vec::new())
3463 .expect(1)
3464 .mount()
3465 .await;
3466
3467 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3468 assert_matches!(ret, Ok(false));
3469 }
3470
3471 #[async_test]
3472 async fn test_is_room_alias_available_if_error_found() {
3473 let server = MatrixMockServer::new().await;
3474 let client = server.client_builder().build().await;
3475
3476 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3477
3478 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3479 assert_matches!(ret, Err(_));
3480 }
3481
3482 #[async_test]
3483 async fn test_create_room_alias() {
3484 let server = MatrixMockServer::new().await;
3485 let client = server.client_builder().build().await;
3486
3487 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3488
3489 let ret = client
3490 .create_room_alias(
3491 room_alias_id!("#some_alias:matrix.org"),
3492 room_id!("!some_room:matrix.org"),
3493 )
3494 .await;
3495 assert_matches!(ret, Ok(()));
3496 }
3497
3498 #[async_test]
3499 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3500 let server = MatrixMockServer::new().await;
3501 let client = server.client_builder().build().await;
3502
3503 let room_id = room_id!("!a-room:matrix.org");
3504
3505 // Make sure the summary endpoint is called once
3506 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3507
3508 // We create a locally cached invited room
3509 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3510
3511 // And we get a preview, the server endpoint was reached
3512 let preview = client
3513 .get_room_preview(room_id.into(), Vec::new())
3514 .await
3515 .expect("Room preview should be retrieved");
3516
3517 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3518 }
3519
3520 #[async_test]
3521 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3522 let server = MatrixMockServer::new().await;
3523 let client = server.client_builder().build().await;
3524
3525 let room_id = room_id!("!a-room:matrix.org");
3526
3527 // Make sure the summary endpoint is called once
3528 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3529
3530 // We create a locally cached left room
3531 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3532
3533 // And we get a preview, the server endpoint was reached
3534 let preview = client
3535 .get_room_preview(room_id.into(), Vec::new())
3536 .await
3537 .expect("Room preview should be retrieved");
3538
3539 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3540 }
3541
3542 #[async_test]
3543 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3544 let server = MatrixMockServer::new().await;
3545 let client = server.client_builder().build().await;
3546
3547 let room_id = room_id!("!a-room:matrix.org");
3548
3549 // Make sure the summary endpoint is called once
3550 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3551
3552 // We create a locally cached knocked room
3553 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3554
3555 // And we get a preview, the server endpoint was reached
3556 let preview = client
3557 .get_room_preview(room_id.into(), Vec::new())
3558 .await
3559 .expect("Room preview should be retrieved");
3560
3561 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3562 }
3563
3564 #[async_test]
3565 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3566 let server = MatrixMockServer::new().await;
3567 let client = server.client_builder().build().await;
3568
3569 let room_id = room_id!("!a-room:matrix.org");
3570
3571 // Make sure the summary endpoint is not called
3572 server.mock_room_summary().ok(room_id).never().mount().await;
3573
3574 // We create a locally cached joined room
3575 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3576
3577 // And we get a preview, no server endpoint was reached
3578 let preview = client
3579 .get_room_preview(room_id.into(), Vec::new())
3580 .await
3581 .expect("Room preview should be retrieved");
3582
3583 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3584 }
3585
3586 #[async_test]
3587 async fn test_media_preview_config() {
3588 let server = MatrixMockServer::new().await;
3589 let client = server.client_builder().build().await;
3590
3591 server
3592 .mock_sync()
3593 .ok_and_run(&client, |builder| {
3594 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3595 "content": {
3596 "media_previews": "private",
3597 "invite_avatars": "off"
3598 },
3599 "type": "m.media_preview_config"
3600 })));
3601 })
3602 .await;
3603
3604 let (initial_value, stream) =
3605 client.account().observe_media_preview_config().await.unwrap();
3606
3607 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3608 assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3609 assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3610 pin_mut!(stream);
3611 assert_pending!(stream);
3612
3613 server
3614 .mock_sync()
3615 .ok_and_run(&client, |builder| {
3616 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3617 "content": {
3618 "media_previews": "off",
3619 "invite_avatars": "on"
3620 },
3621 "type": "m.media_preview_config"
3622 })));
3623 })
3624 .await;
3625
3626 assert_next_matches!(
3627 stream,
3628 MediaPreviewConfigEventContent {
3629 media_previews: MediaPreviews::Off,
3630 invite_avatars: InviteAvatars::On,
3631 ..
3632 }
3633 );
3634 assert_pending!(stream);
3635 }
3636
3637 #[async_test]
3638 async fn test_unstable_media_preview_config() {
3639 let server = MatrixMockServer::new().await;
3640 let client = server.client_builder().build().await;
3641
3642 server
3643 .mock_sync()
3644 .ok_and_run(&client, |builder| {
3645 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3646 "content": {
3647 "media_previews": "private",
3648 "invite_avatars": "off"
3649 },
3650 "type": "io.element.msc4278.media_preview_config"
3651 })));
3652 })
3653 .await;
3654
3655 let (initial_value, stream) =
3656 client.account().observe_media_preview_config().await.unwrap();
3657
3658 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3659 assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3660 assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3661 pin_mut!(stream);
3662 assert_pending!(stream);
3663
3664 server
3665 .mock_sync()
3666 .ok_and_run(&client, |builder| {
3667 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3668 "content": {
3669 "media_previews": "off",
3670 "invite_avatars": "on"
3671 },
3672 "type": "io.element.msc4278.media_preview_config"
3673 })));
3674 })
3675 .await;
3676
3677 assert_next_matches!(
3678 stream,
3679 MediaPreviewConfigEventContent {
3680 media_previews: MediaPreviews::Off,
3681 invite_avatars: InviteAvatars::On,
3682 ..
3683 }
3684 );
3685 assert_pending!(stream);
3686 }
3687
3688 #[async_test]
3689 async fn test_media_preview_config_not_found() {
3690 let server = MatrixMockServer::new().await;
3691 let client = server.client_builder().build().await;
3692
3693 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3694
3695 assert!(initial_value.is_none());
3696 }
3697
3698 #[async_test]
3699 async fn test_load_or_fetch_max_upload_size() {
3700 let server = MatrixMockServer::new().await;
3701 let client = server.client_builder().build().await;
3702
3703 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3704
3705 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3706 client.load_or_fetch_max_upload_size().await.unwrap();
3707
3708 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3709 }
3710
3711 #[async_test]
3712 async fn test_uploading_a_too_large_media_file() {
3713 let server = MatrixMockServer::new().await;
3714 let client = server.client_builder().build().await;
3715
3716 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
3717 client.load_or_fetch_max_upload_size().await.unwrap();
3718 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
3719
3720 let data = vec![1, 2];
3721 let upload_request =
3722 ruma::api::client::media::create_content::v3::Request::new(data.clone());
3723 let request = SendRequest {
3724 client: client.clone(),
3725 request: upload_request,
3726 config: None,
3727 send_progress: SharedObservable::new(TransmissionProgress::default()),
3728 };
3729 let media_request = SendMediaUploadRequest::new(request);
3730
3731 let error = media_request.await.err();
3732 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
3733 assert_eq!(max, uint!(1));
3734 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
3735 }
3736}