matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{btree_map, BTreeMap},
19 fmt::{self, Debug},
20 future::{ready, Future},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23};
24
25use caches::ClientCaches;
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::store::LockableCryptoStore;
32use matrix_sdk_base::{
33 event_cache::store::EventCacheStoreLock,
34 store::{DynStateStore, RoomLoadSettings, ServerCapabilities},
35 sync::{Notification, RoomUpdates},
36 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
37 StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm,
38};
39use matrix_sdk_common::ttl_cache::TtlCache;
40#[cfg(feature = "e2e-encryption")]
41use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
42use ruma::{
43 api::{
44 client::{
45 account::whoami,
46 alias::{create_alias, delete_alias, get_alias},
47 device::{delete_devices, get_devices, update_device},
48 directory::{get_public_rooms, get_public_rooms_filtered},
49 discovery::{
50 get_capabilities::{self, Capabilities},
51 get_supported_versions,
52 },
53 error::ErrorKind,
54 filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
55 knock::knock_room,
56 membership::{join_room_by_id, join_room_by_id_or_alias},
57 room::create_room,
58 session::login::v3::DiscoveryInfo,
59 sync::sync_events,
60 uiaa,
61 user_directory::search_users,
62 },
63 error::FromHttpResponseError,
64 MatrixVersion, OutgoingRequest,
65 },
66 assign,
67 push::Ruleset,
68 time::Instant,
69 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
70 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
71};
72use serde::de::DeserializeOwned;
73use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
74use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
75use url::Url;
76
77use self::futures::SendRequest;
78use crate::{
79 authentication::{
80 matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
81 SaveSessionCallback,
82 },
83 config::RequestConfig,
84 deduplicating_handler::DeduplicatingHandler,
85 error::HttpResult,
86 event_cache::EventCache,
87 event_handler::{
88 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
89 EventHandlerStore, ObservableEventHandler, SyncEvent,
90 },
91 http_client::HttpClient,
92 notification_settings::NotificationSettings,
93 room_preview::RoomPreview,
94 send_queue::SendQueueData,
95 sliding_sync::Version as SlidingSyncVersion,
96 sync::{RoomUpdate, SyncResponse},
97 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
98 Room, SessionTokens, TransmissionProgress,
99};
100#[cfg(feature = "e2e-encryption")]
101use crate::{
102 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
103 store_locks::CrossProcessStoreLock,
104};
105
106mod builder;
107pub(crate) mod caches;
108pub(crate) mod futures;
109
110pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
111
112#[cfg(not(target_arch = "wasm32"))]
113type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
114#[cfg(target_arch = "wasm32")]
115type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
116
117#[cfg(not(target_arch = "wasm32"))]
118type NotificationHandlerFn =
119 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
120#[cfg(target_arch = "wasm32")]
121type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
122
123/// Enum controlling if a loop running callbacks should continue or abort.
124///
125/// This is mainly used in the [`sync_with_callback`] method, the return value
126/// of the provided callback controls if the sync loop should be exited.
127///
128/// [`sync_with_callback`]: #method.sync_with_callback
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum LoopCtrl {
131 /// Continue running the loop.
132 Continue,
133 /// Break out of the loop.
134 Break,
135}
136
137/// Represents changes that can occur to a `Client`s `Session`.
138#[derive(Debug, Clone, PartialEq)]
139pub enum SessionChange {
140 /// The session's token is no longer valid.
141 UnknownToken {
142 /// Whether or not the session was soft logged out
143 soft_logout: bool,
144 },
145 /// The session's tokens have been refreshed.
146 TokensRefreshed,
147}
148
149/// An async/await enabled Matrix client.
150///
151/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
152#[derive(Clone)]
153pub struct Client {
154 pub(crate) inner: Arc<ClientInner>,
155}
156
157#[derive(Default)]
158pub(crate) struct ClientLocks {
159 /// Lock ensuring that only a single room may be marked as a DM at once.
160 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
161 /// explanation.
162 pub(crate) mark_as_dm_lock: Mutex<()>,
163
164 /// Lock ensuring that only a single secret store is getting opened at the
165 /// same time.
166 ///
167 /// This is important so we don't accidentally create multiple different new
168 /// default secret storage keys.
169 #[cfg(feature = "e2e-encryption")]
170 pub(crate) open_secret_store_lock: Mutex<()>,
171
172 /// Lock ensuring that we're only storing a single secret at a time.
173 ///
174 /// Take a look at the [`SecretStore::put_secret`] method for a more
175 /// detailed explanation.
176 ///
177 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
178 #[cfg(feature = "e2e-encryption")]
179 pub(crate) store_secret_lock: Mutex<()>,
180
181 /// Lock ensuring that only one method at a time might modify our backup.
182 #[cfg(feature = "e2e-encryption")]
183 pub(crate) backup_modify_lock: Mutex<()>,
184
185 /// Lock ensuring that we're going to attempt to upload backups for a single
186 /// requester.
187 #[cfg(feature = "e2e-encryption")]
188 pub(crate) backup_upload_lock: Mutex<()>,
189
190 /// Handler making sure we only have one group session sharing request in
191 /// flight per room.
192 #[cfg(feature = "e2e-encryption")]
193 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
194
195 /// Lock making sure we're only doing one key claim request at a time.
196 #[cfg(feature = "e2e-encryption")]
197 pub(crate) key_claim_lock: Mutex<()>,
198
199 /// Handler to ensure that only one members request is running at a time,
200 /// given a room.
201 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
202
203 /// Handler to ensure that only one encryption state request is running at a
204 /// time, given a room.
205 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
206
207 /// Deduplicating handler for sending read receipts. The string is an
208 /// internal implementation detail, see [`Self::send_single_receipt`].
209 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
210
211 #[cfg(feature = "e2e-encryption")]
212 pub(crate) cross_process_crypto_store_lock:
213 OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
214
215 /// Latest "generation" of data known by the crypto store.
216 ///
217 /// This is a counter that only increments, set in the database (and can
218 /// wrap). It's incremented whenever some process acquires a lock for the
219 /// first time. *This assumes the crypto store lock is being held, to
220 /// avoid data races on writing to this value in the store*.
221 ///
222 /// The current process will maintain this value in local memory and in the
223 /// DB over time. Observing a different value than the one read in
224 /// memory, when reading from the store indicates that somebody else has
225 /// written into the database under our feet.
226 ///
227 /// TODO: this should live in the `OlmMachine`, since it's information
228 /// related to the lock. As of today (2023-07-28), we blow up the entire
229 /// olm machine when there's a generation mismatch. So storing the
230 /// generation in the olm machine would make the client think there's
231 /// *always* a mismatch, and that's why we need to store the generation
232 /// outside the `OlmMachine`.
233 #[cfg(feature = "e2e-encryption")]
234 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
235}
236
237pub(crate) struct ClientInner {
238 /// All the data related to authentication and authorization.
239 pub(crate) auth_ctx: Arc<AuthCtx>,
240
241 /// The URL of the server.
242 ///
243 /// Not to be confused with the `Self::homeserver`. `server` is usually
244 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
245 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
246 /// homeserver (at the time of writing — 2024-08-28).
247 ///
248 /// This value is optional depending on how the `Client` has been built.
249 /// If it's been built from a homeserver URL directly, we don't know the
250 /// server. However, if the `Client` has been built from a server URL or
251 /// name, then the homeserver has been discovered, and we know both.
252 server: Option<Url>,
253
254 /// The URL of the homeserver to connect to.
255 ///
256 /// This is the URL for the client-server Matrix API.
257 homeserver: StdRwLock<Url>,
258
259 /// The sliding sync version.
260 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
261
262 /// The underlying HTTP client.
263 pub(crate) http_client: HttpClient,
264
265 /// User session data.
266 pub(super) base_client: BaseClient,
267
268 /// Collection of in-memory caches for the [`Client`].
269 pub(crate) caches: ClientCaches,
270
271 /// Collection of locks individual client methods might want to use, either
272 /// to ensure that only a single call to a method happens at once or to
273 /// deduplicate multiple calls to a method.
274 pub(crate) locks: ClientLocks,
275
276 /// The cross-process store locks holder name.
277 ///
278 /// The SDK provides cross-process store locks (see
279 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
280 /// `holder_name` is the value used for all cross-process store locks
281 /// used by this `Client`.
282 ///
283 /// If multiple `Client`s are running in different processes, this
284 /// value MUST be different for each `Client`.
285 cross_process_store_locks_holder_name: String,
286
287 /// A mapping of the times at which the current user sent typing notices,
288 /// keyed by room.
289 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
290
291 /// Event handlers. See `add_event_handler`.
292 pub(crate) event_handlers: EventHandlerStore,
293
294 /// Notification handlers. See `register_notification_handler`.
295 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
296
297 /// The sender-side of channels used to receive room updates.
298 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
299
300 /// The sender-side of a channel used to observe all the room updates of a
301 /// sync response.
302 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
303
304 /// Whether the client should update its homeserver URL with the discovery
305 /// information present in the login response.
306 respect_login_well_known: bool,
307
308 /// An event that can be listened on to wait for a successful sync. The
309 /// event will only be fired if a sync loop is running. Can be used for
310 /// synchronization, e.g. if we send out a request to create a room, we can
311 /// wait for the sync to get the data to fetch a room object from the state
312 /// store.
313 pub(crate) sync_beat: event_listener::Event,
314
315 /// A central cache for events, inactive first.
316 ///
317 /// It becomes active when [`EventCache::subscribe`] is called.
318 pub(crate) event_cache: OnceCell<EventCache>,
319
320 /// End-to-end encryption related state.
321 #[cfg(feature = "e2e-encryption")]
322 pub(crate) e2ee: EncryptionData,
323
324 /// The verification state of our own device.
325 #[cfg(feature = "e2e-encryption")]
326 pub(crate) verification_state: SharedObservable<VerificationState>,
327
328 /// Data related to the [`SendQueue`].
329 ///
330 /// [`SendQueue`]: crate::send_queue::SendQueue
331 pub(crate) send_queue_data: Arc<SendQueueData>,
332}
333
334impl ClientInner {
335 /// Create a new `ClientInner`.
336 ///
337 /// All the fields passed as parameters here are those that must be cloned
338 /// upon instantiation of a sub-client, e.g. a client specialized for
339 /// notifications.
340 #[allow(clippy::too_many_arguments)]
341 async fn new(
342 auth_ctx: Arc<AuthCtx>,
343 server: Option<Url>,
344 homeserver: Url,
345 sliding_sync_version: SlidingSyncVersion,
346 http_client: HttpClient,
347 base_client: BaseClient,
348 server_capabilities: ClientServerCapabilities,
349 respect_login_well_known: bool,
350 event_cache: OnceCell<EventCache>,
351 send_queue: Arc<SendQueueData>,
352 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
353 cross_process_store_locks_holder_name: String,
354 ) -> Arc<Self> {
355 let caches = ClientCaches {
356 server_capabilities: server_capabilities.into(),
357 server_metadata: Mutex::new(TtlCache::new()),
358 };
359
360 let client = Self {
361 server,
362 homeserver: StdRwLock::new(homeserver),
363 auth_ctx,
364 sliding_sync_version: StdRwLock::new(sliding_sync_version),
365 http_client,
366 base_client,
367 caches,
368 locks: Default::default(),
369 cross_process_store_locks_holder_name,
370 typing_notice_times: Default::default(),
371 event_handlers: Default::default(),
372 notification_handlers: Default::default(),
373 room_update_channels: Default::default(),
374 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
375 // ballast for all observers to catch up.
376 room_updates_sender: broadcast::Sender::new(32),
377 respect_login_well_known,
378 sync_beat: event_listener::Event::new(),
379 event_cache,
380 send_queue_data: send_queue,
381 #[cfg(feature = "e2e-encryption")]
382 e2ee: EncryptionData::new(encryption_settings),
383 #[cfg(feature = "e2e-encryption")]
384 verification_state: SharedObservable::new(VerificationState::Unknown),
385 };
386
387 #[allow(clippy::let_and_return)]
388 let client = Arc::new(client);
389
390 #[cfg(feature = "e2e-encryption")]
391 client.e2ee.initialize_room_key_tasks(&client);
392
393 let _ = client
394 .event_cache
395 .get_or_init(|| async { EventCache::new(WeakClient::from_inner(&client)) })
396 .await;
397
398 client
399 }
400}
401
402#[cfg(not(tarpaulin_include))]
403impl Debug for Client {
404 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
405 write!(fmt, "Client")
406 }
407}
408
409impl Client {
410 /// Create a new [`Client`] that will use the given homeserver.
411 ///
412 /// # Arguments
413 ///
414 /// * `homeserver_url` - The homeserver that the client should connect to.
415 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
416 Self::builder().homeserver_url(homeserver_url).build().await
417 }
418
419 /// Returns a subscriber that publishes an event every time the ignore user
420 /// list changes.
421 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
422 self.inner.base_client.subscribe_to_ignore_user_list_changes()
423 }
424
425 /// Create a new [`ClientBuilder`].
426 pub fn builder() -> ClientBuilder {
427 ClientBuilder::new()
428 }
429
430 pub(crate) fn base_client(&self) -> &BaseClient {
431 &self.inner.base_client
432 }
433
434 /// The underlying HTTP client.
435 pub fn http_client(&self) -> &reqwest::Client {
436 &self.inner.http_client.inner
437 }
438
439 pub(crate) fn locks(&self) -> &ClientLocks {
440 &self.inner.locks
441 }
442
443 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
444 &self.inner.auth_ctx
445 }
446
447 /// The cross-process store locks holder name.
448 ///
449 /// The SDK provides cross-process store locks (see
450 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
451 /// `holder_name` is the value used for all cross-process store locks
452 /// used by this `Client`.
453 pub fn cross_process_store_locks_holder_name(&self) -> &str {
454 &self.inner.cross_process_store_locks_holder_name
455 }
456
457 /// Change the homeserver URL used by this client.
458 ///
459 /// # Arguments
460 ///
461 /// * `homeserver_url` - The new URL to use.
462 fn set_homeserver(&self, homeserver_url: Url) {
463 *self.inner.homeserver.write().unwrap() = homeserver_url;
464 }
465
466 /// Get the capabilities of the homeserver.
467 ///
468 /// This method should be used to check what features are supported by the
469 /// homeserver.
470 ///
471 /// # Examples
472 ///
473 /// ```no_run
474 /// # use matrix_sdk::Client;
475 /// # use url::Url;
476 /// # async {
477 /// # let homeserver = Url::parse("http://example.com")?;
478 /// let client = Client::new(homeserver).await?;
479 ///
480 /// let capabilities = client.get_capabilities().await?;
481 ///
482 /// if capabilities.change_password.enabled {
483 /// // Change password
484 /// }
485 /// # anyhow::Ok(()) };
486 /// ```
487 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
488 let res = self.send(get_capabilities::v3::Request::new()).await?;
489 Ok(res.capabilities)
490 }
491
492 /// Get a copy of the default request config.
493 ///
494 /// The default request config is what's used when sending requests if no
495 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
496 /// function with such a parameter.
497 ///
498 /// If the default request config was not customized through
499 /// [`ClientBuilder`] when creating this `Client`, the returned value will
500 /// be equivalent to [`RequestConfig::default()`].
501 pub fn request_config(&self) -> RequestConfig {
502 self.inner.http_client.request_config
503 }
504
505 /// Check whether the client has been activated.
506 ///
507 /// A client is considered active when:
508 ///
509 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
510 /// is logged in,
511 /// 2. Has loaded cached data from storage,
512 /// 3. If encryption is enabled, it also initialized or restored its
513 /// `OlmMachine`.
514 pub fn is_active(&self) -> bool {
515 self.inner.base_client.is_active()
516 }
517
518 /// The server used by the client.
519 ///
520 /// See `Self::server` to learn more.
521 pub fn server(&self) -> Option<&Url> {
522 self.inner.server.as_ref()
523 }
524
525 /// The homeserver of the client.
526 pub fn homeserver(&self) -> Url {
527 self.inner.homeserver.read().unwrap().clone()
528 }
529
530 /// Get the sliding sync version.
531 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
532 self.inner.sliding_sync_version.read().unwrap().clone()
533 }
534
535 /// Override the sliding sync version.
536 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
537 let mut lock = self.inner.sliding_sync_version.write().unwrap();
538 *lock = version;
539 }
540
541 /// Get the Matrix user session meta information.
542 ///
543 /// If the client is currently logged in, this will return a
544 /// [`SessionMeta`] object which contains the user ID and device ID.
545 /// Otherwise it returns `None`.
546 pub fn session_meta(&self) -> Option<&SessionMeta> {
547 self.base_client().session_meta()
548 }
549
550 /// Returns a receiver that gets events for each room info update. To watch
551 /// for new events, use `receiver.resubscribe()`. Each event contains the
552 /// room and a boolean whether this event should trigger a room list update.
553 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
554 self.base_client().room_info_notable_update_receiver()
555 }
556
557 /// Performs a search for users.
558 /// The search is performed case-insensitively on user IDs and display names
559 ///
560 /// # Arguments
561 ///
562 /// * `search_term` - The search term for the search
563 /// * `limit` - The maximum number of results to return. Defaults to 10.
564 ///
565 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
566 pub async fn search_users(
567 &self,
568 search_term: &str,
569 limit: u64,
570 ) -> HttpResult<search_users::v3::Response> {
571 let mut request = search_users::v3::Request::new(search_term.to_owned());
572
573 if let Some(limit) = UInt::new(limit) {
574 request.limit = limit;
575 }
576
577 self.send(request).await
578 }
579
580 /// Get the user id of the current owner of the client.
581 pub fn user_id(&self) -> Option<&UserId> {
582 self.session_meta().map(|s| s.user_id.as_ref())
583 }
584
585 /// Get the device ID that identifies the current session.
586 pub fn device_id(&self) -> Option<&DeviceId> {
587 self.session_meta().map(|s| s.device_id.as_ref())
588 }
589
590 /// Get the current access token for this session.
591 ///
592 /// Will be `None` if the client has not been logged in.
593 pub fn access_token(&self) -> Option<String> {
594 self.auth_ctx().access_token()
595 }
596
597 /// Get the current tokens for this session.
598 ///
599 /// To be notified of changes in the session tokens, use
600 /// [`Client::subscribe_to_session_changes()`] or
601 /// [`Client::set_session_callbacks()`].
602 ///
603 /// Returns `None` if the client has not been logged in.
604 pub fn session_tokens(&self) -> Option<SessionTokens> {
605 self.auth_ctx().session_tokens()
606 }
607
608 /// Access the authentication API used to log in this client.
609 ///
610 /// Will be `None` if the client has not been logged in.
611 pub fn auth_api(&self) -> Option<AuthApi> {
612 match self.auth_ctx().auth_data.get()? {
613 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
614 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
615 }
616 }
617
618 /// Get the whole session info of this client.
619 ///
620 /// Will be `None` if the client has not been logged in.
621 ///
622 /// Can be used with [`Client::restore_session`] to restore a previously
623 /// logged-in session.
624 pub fn session(&self) -> Option<AuthSession> {
625 match self.auth_api()? {
626 AuthApi::Matrix(api) => api.session().map(Into::into),
627 AuthApi::OAuth(api) => api.full_session().map(Into::into),
628 }
629 }
630
631 /// Get a reference to the state store.
632 pub fn state_store(&self) -> &DynStateStore {
633 self.base_client().state_store()
634 }
635
636 /// Get a reference to the event cache store.
637 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
638 self.base_client().event_cache_store()
639 }
640
641 /// Access the native Matrix authentication API with this client.
642 pub fn matrix_auth(&self) -> MatrixAuth {
643 MatrixAuth::new(self.clone())
644 }
645
646 /// Get the account of the current owner of the client.
647 pub fn account(&self) -> Account {
648 Account::new(self.clone())
649 }
650
651 /// Get the encryption manager of the client.
652 #[cfg(feature = "e2e-encryption")]
653 pub fn encryption(&self) -> Encryption {
654 Encryption::new(self.clone())
655 }
656
657 /// Get the media manager of the client.
658 pub fn media(&self) -> Media {
659 Media::new(self.clone())
660 }
661
662 /// Get the pusher manager of the client.
663 pub fn pusher(&self) -> Pusher {
664 Pusher::new(self.clone())
665 }
666
667 /// Access the OAuth 2.0 API of the client.
668 pub fn oauth(&self) -> OAuth {
669 OAuth::new(self.clone())
670 }
671
672 /// Register a handler for a specific event type.
673 ///
674 /// The handler is a function or closure with one or more arguments. The
675 /// first argument is the event itself. All additional arguments are
676 /// "context" arguments: They have to implement [`EventHandlerContext`].
677 /// This trait is named that way because most of the types implementing it
678 /// give additional context about an event: The room it was in, its raw form
679 /// and other similar things. As two exceptions to this,
680 /// [`Client`] and [`EventHandlerHandle`] also implement the
681 /// `EventHandlerContext` trait so you don't have to clone your client
682 /// into the event handler manually and a handler can decide to remove
683 /// itself.
684 ///
685 /// Some context arguments are not universally applicable. A context
686 /// argument that isn't available for the given event type will result in
687 /// the event handler being skipped and an error being logged. The following
688 /// context argument types are only available for a subset of event types:
689 ///
690 /// * [`Room`] is only available for room-specific events, i.e. not for
691 /// events like global account data events or presence events.
692 ///
693 /// You can provide custom context via
694 /// [`add_event_handler_context`](Client::add_event_handler_context) and
695 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
696 /// into the event handler.
697 ///
698 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
699 ///
700 /// # Examples
701 ///
702 /// ```no_run
703 /// use matrix_sdk::{
704 /// deserialized_responses::EncryptionInfo,
705 /// event_handler::Ctx,
706 /// ruma::{
707 /// events::{
708 /// macros::EventContent,
709 /// push_rules::PushRulesEvent,
710 /// room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
711 /// },
712 /// push::Action,
713 /// Int, MilliSecondsSinceUnixEpoch,
714 /// },
715 /// Client, Room,
716 /// };
717 /// use serde::{Deserialize, Serialize};
718 ///
719 /// # async fn example(client: Client) {
720 /// client.add_event_handler(
721 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
722 /// // Common usage: Room event plus room and client.
723 /// },
724 /// );
725 /// client.add_event_handler(
726 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
727 /// async move {
728 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
729 /// // unencrypted events and events that were decrypted by the SDK.
730 /// }
731 /// },
732 /// );
733 /// client.add_event_handler(
734 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
735 /// async move {
736 /// // A `Vec<Action>` parameter allows you to know which push actions
737 /// // are applicable for an event. For example, an event with
738 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
739 /// // in the timeline.
740 /// }
741 /// },
742 /// );
743 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
744 /// // You can omit any or all arguments after the first.
745 /// });
746 ///
747 /// // Registering a temporary event handler:
748 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
749 /// /* Event handler */
750 /// });
751 /// client.remove_event_handler(handle);
752 ///
753 /// // Registering custom event handler context:
754 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
755 /// struct MyContext {
756 /// number: usize,
757 /// }
758 /// client.add_event_handler_context(MyContext { number: 5 });
759 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
760 /// // Use the context
761 /// });
762 ///
763 /// // Custom events work exactly the same way, you just need to declare
764 /// // the content struct and use the EventContent derive macro on it.
765 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
766 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
767 /// struct TokenEventContent {
768 /// token: String,
769 /// #[serde(rename = "exp")]
770 /// expires_at: MilliSecondsSinceUnixEpoch,
771 /// }
772 ///
773 /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
774 /// todo!("Display the token");
775 /// });
776 ///
777 /// // Event handler closures can also capture local variables.
778 /// // Make sure they are cheap to clone though, because they will be cloned
779 /// // every time the closure is called.
780 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
781 ///
782 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
783 /// println!("Calling the handler with identifier {data}");
784 /// });
785 /// # }
786 /// ```
787 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
788 where
789 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
790 H: EventHandler<Ev, Ctx>,
791 {
792 self.add_event_handler_impl(handler, None)
793 }
794
795 /// Register a handler for a specific room, and event type.
796 ///
797 /// This method works the same way as
798 /// [`add_event_handler`][Self::add_event_handler], except that the handler
799 /// will only be called for events in the room with the specified ID. See
800 /// that method for more details on event handler functions.
801 ///
802 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
803 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
804 /// your use case.
805 pub fn add_room_event_handler<Ev, Ctx, H>(
806 &self,
807 room_id: &RoomId,
808 handler: H,
809 ) -> EventHandlerHandle
810 where
811 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
812 H: EventHandler<Ev, Ctx>,
813 {
814 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
815 }
816
817 /// Observe a specific event type.
818 ///
819 /// `Ev` represents the kind of event that will be observed. `Ctx`
820 /// represents the context that will come with the event. It relies on the
821 /// same mechanism as [`Client::add_event_handler`]. The main difference is
822 /// that it returns an [`ObservableEventHandler`] and doesn't require a
823 /// user-defined closure. It is possible to subscribe to the
824 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
825 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
826 /// Ctx)`.
827 ///
828 /// Be careful that only the most recent value can be observed. Subscribers
829 /// are notified when a new value is sent, but there is no guarantee
830 /// that they will see all values.
831 ///
832 /// # Example
833 ///
834 /// Let's see a classical usage:
835 ///
836 /// ```
837 /// use futures_util::StreamExt as _;
838 /// use matrix_sdk::{
839 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
840 /// Client, Room,
841 /// };
842 ///
843 /// # async fn example(client: Client) -> Option<()> {
844 /// let observer =
845 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
846 ///
847 /// let mut subscriber = observer.subscribe();
848 ///
849 /// let (event, (room, push_actions)) = subscriber.next().await?;
850 /// # Some(())
851 /// # }
852 /// ```
853 ///
854 /// Now let's see how to get several contexts that can be useful for you:
855 ///
856 /// ```
857 /// use matrix_sdk::{
858 /// deserialized_responses::EncryptionInfo,
859 /// ruma::{
860 /// events::room::{
861 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
862 /// },
863 /// push::Action,
864 /// },
865 /// Client, Room,
866 /// };
867 ///
868 /// # async fn example(client: Client) {
869 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
870 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
871 ///
872 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
873 /// // to distinguish between unencrypted events and events that were decrypted
874 /// // by the SDK.
875 /// let _ = client
876 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
877 /// );
878 ///
879 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
880 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
881 /// // should be highlighted in the timeline.
882 /// let _ =
883 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
884 ///
885 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
886 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
887 /// # }
888 /// ```
889 ///
890 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
891 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
892 where
893 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
894 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
895 {
896 self.observe_room_events_impl(None)
897 }
898
899 /// Observe a specific room, and event type.
900 ///
901 /// This method works the same way as [`Client::observe_events`], except
902 /// that the observability will only be applied for events in the room with
903 /// the specified ID. See that method for more details.
904 ///
905 /// Be careful that only the most recent value can be observed. Subscribers
906 /// are notified when a new value is sent, but there is no guarantee
907 /// that they will see all values.
908 pub fn observe_room_events<Ev, Ctx>(
909 &self,
910 room_id: &RoomId,
911 ) -> ObservableEventHandler<(Ev, Ctx)>
912 where
913 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
914 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
915 {
916 self.observe_room_events_impl(Some(room_id.to_owned()))
917 }
918
919 /// Shared implementation for `Client::observe_events` and
920 /// `Client::observe_room_events`.
921 fn observe_room_events_impl<Ev, Ctx>(
922 &self,
923 room_id: Option<OwnedRoomId>,
924 ) -> ObservableEventHandler<(Ev, Ctx)>
925 where
926 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
927 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
928 {
929 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
930 // new value.
931 let shared_observable = SharedObservable::new(None);
932
933 ObservableEventHandler::new(
934 shared_observable.clone(),
935 self.event_handler_drop_guard(self.add_event_handler_impl(
936 move |event: Ev, context: Ctx| {
937 shared_observable.set(Some((event, context)));
938
939 ready(())
940 },
941 room_id,
942 )),
943 )
944 }
945
946 /// Remove the event handler associated with the handle.
947 ///
948 /// Note that you **must not** call `remove_event_handler` from the
949 /// non-async part of an event handler, that is:
950 ///
951 /// ```ignore
952 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
953 /// // ⚠ this will cause a deadlock ⚠
954 /// client.remove_event_handler(handle);
955 ///
956 /// async move {
957 /// // removing the event handler here is fine
958 /// client.remove_event_handler(handle);
959 /// }
960 /// })
961 /// ```
962 ///
963 /// Note also that handlers that remove themselves will still execute with
964 /// events received in the same sync cycle.
965 ///
966 /// # Arguments
967 ///
968 /// `handle` - The [`EventHandlerHandle`] that is returned when
969 /// registering the event handler with [`Client::add_event_handler`].
970 ///
971 /// # Examples
972 ///
973 /// ```no_run
974 /// # use url::Url;
975 /// # use tokio::sync::mpsc;
976 /// #
977 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
978 /// #
979 /// use matrix_sdk::{
980 /// event_handler::EventHandlerHandle,
981 /// ruma::events::room::member::SyncRoomMemberEvent, Client,
982 /// };
983 /// #
984 /// # futures_executor::block_on(async {
985 /// # let client = matrix_sdk::Client::builder()
986 /// # .homeserver_url(homeserver)
987 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
988 /// # .build()
989 /// # .await
990 /// # .unwrap();
991 ///
992 /// client.add_event_handler(
993 /// |ev: SyncRoomMemberEvent,
994 /// client: Client,
995 /// handle: EventHandlerHandle| async move {
996 /// // Common usage: Check arriving Event is the expected one
997 /// println!("Expected RoomMemberEvent received!");
998 /// client.remove_event_handler(handle);
999 /// },
1000 /// );
1001 /// # });
1002 /// ```
1003 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1004 self.inner.event_handlers.remove(handle);
1005 }
1006
1007 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1008 /// the given handle.
1009 ///
1010 /// When the returned value is dropped, the event handler will be removed.
1011 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1012 EventHandlerDropGuard::new(handle, self.clone())
1013 }
1014
1015 /// Add an arbitrary value for use as event handler context.
1016 ///
1017 /// The value can be obtained in an event handler by adding an argument of
1018 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1019 ///
1020 /// If a value of the same type has been added before, it will be
1021 /// overwritten.
1022 ///
1023 /// # Examples
1024 ///
1025 /// ```no_run
1026 /// use matrix_sdk::{
1027 /// event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1028 /// Room,
1029 /// };
1030 /// # #[derive(Clone)]
1031 /// # struct SomeType;
1032 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1033 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1034 /// # futures_executor::block_on(async {
1035 /// # let client = matrix_sdk::Client::builder()
1036 /// # .homeserver_url(homeserver)
1037 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1038 /// # .build()
1039 /// # .await
1040 /// # .unwrap();
1041 ///
1042 /// // Handle used to send messages to the UI part of the app
1043 /// let my_gui_handle: SomeType = obtain_gui_handle();
1044 ///
1045 /// client.add_event_handler_context(my_gui_handle.clone());
1046 /// client.add_event_handler(
1047 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1048 /// async move {
1049 /// // gui_handle.send(DisplayMessage { message: ev });
1050 /// }
1051 /// },
1052 /// );
1053 /// # });
1054 /// ```
1055 pub fn add_event_handler_context<T>(&self, ctx: T)
1056 where
1057 T: Clone + Send + Sync + 'static,
1058 {
1059 self.inner.event_handlers.add_context(ctx);
1060 }
1061
1062 /// Register a handler for a notification.
1063 ///
1064 /// Similar to [`Client::add_event_handler`], but only allows functions
1065 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1066 /// [`Client`] for now.
1067 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1068 where
1069 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1070 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1071 {
1072 self.inner.notification_handlers.write().await.push(Box::new(
1073 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1074 ));
1075
1076 self
1077 }
1078
1079 /// Subscribe to all updates for the room with the given ID.
1080 ///
1081 /// The returned receiver will receive a new message for each sync response
1082 /// that contains updates for that room.
1083 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1084 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1085 btree_map::Entry::Vacant(entry) => {
1086 let (tx, rx) = broadcast::channel(8);
1087 entry.insert(tx);
1088 rx
1089 }
1090 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1091 }
1092 }
1093
1094 /// Subscribe to all updates to all rooms, whenever any has been received in
1095 /// a sync response.
1096 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1097 self.inner.room_updates_sender.subscribe()
1098 }
1099
1100 pub(crate) async fn notification_handlers(
1101 &self,
1102 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1103 self.inner.notification_handlers.read().await
1104 }
1105
1106 /// Get all the rooms the client knows about.
1107 ///
1108 /// This will return the list of joined, invited, and left rooms.
1109 pub fn rooms(&self) -> Vec<Room> {
1110 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1111 }
1112
1113 /// Get all the rooms the client knows about, filtered by room state.
1114 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1115 self.base_client()
1116 .rooms_filtered(filter)
1117 .into_iter()
1118 .map(|room| Room::new(self.clone(), room))
1119 .collect()
1120 }
1121
1122 /// Get a stream of all the rooms, in addition to the existing rooms.
1123 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1124 let (rooms, stream) = self.base_client().rooms_stream();
1125
1126 let map_room = |room| Room::new(self.clone(), room);
1127
1128 (
1129 rooms.into_iter().map(map_room).collect(),
1130 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1131 )
1132 }
1133
1134 /// Returns the joined rooms this client knows about.
1135 pub fn joined_rooms(&self) -> Vec<Room> {
1136 self.base_client()
1137 .rooms_filtered(RoomStateFilter::JOINED)
1138 .into_iter()
1139 .map(|room| Room::new(self.clone(), room))
1140 .collect()
1141 }
1142
1143 /// Returns the invited rooms this client knows about.
1144 pub fn invited_rooms(&self) -> Vec<Room> {
1145 self.base_client()
1146 .rooms_filtered(RoomStateFilter::INVITED)
1147 .into_iter()
1148 .map(|room| Room::new(self.clone(), room))
1149 .collect()
1150 }
1151
1152 /// Returns the left rooms this client knows about.
1153 pub fn left_rooms(&self) -> Vec<Room> {
1154 self.base_client()
1155 .rooms_filtered(RoomStateFilter::LEFT)
1156 .into_iter()
1157 .map(|room| Room::new(self.clone(), room))
1158 .collect()
1159 }
1160
1161 /// Get a room with the given room id.
1162 ///
1163 /// # Arguments
1164 ///
1165 /// `room_id` - The unique id of the room that should be fetched.
1166 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1167 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1168 }
1169
1170 /// Gets the preview of a room, whether the current user has joined it or
1171 /// not.
1172 pub async fn get_room_preview(
1173 &self,
1174 room_or_alias_id: &RoomOrAliasId,
1175 via: Vec<OwnedServerName>,
1176 ) -> Result<RoomPreview> {
1177 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1178 Ok(room_id) => room_id.to_owned(),
1179 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1180 };
1181
1182 if let Some(room) = self.get_room(&room_id) {
1183 // The cached data can only be trusted if the room state is joined or
1184 // banned: for invite and knock rooms, no updates will be received
1185 // for the rooms after the invite/knock action took place so we may
1186 // have very out to date data for important fields such as
1187 // `join_rule`. For left rooms, the homeserver should return the latest info.
1188 match room.state() {
1189 RoomState::Joined | RoomState::Banned => {
1190 return Ok(RoomPreview::from_known_room(&room).await);
1191 }
1192 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1193 }
1194 }
1195
1196 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1197 }
1198
1199 /// Resolve a room alias to a room id and a list of servers which know
1200 /// about it.
1201 ///
1202 /// # Arguments
1203 ///
1204 /// `room_alias` - The room alias to be resolved.
1205 pub async fn resolve_room_alias(
1206 &self,
1207 room_alias: &RoomAliasId,
1208 ) -> HttpResult<get_alias::v3::Response> {
1209 let request = get_alias::v3::Request::new(room_alias.to_owned());
1210 self.send(request).await
1211 }
1212
1213 /// Checks if a room alias is not in use yet.
1214 ///
1215 /// Returns:
1216 /// - `Ok(true)` if the room alias is available.
1217 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1218 /// status code).
1219 /// - An `Err` otherwise.
1220 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1221 match self.resolve_room_alias(alias).await {
1222 // The room alias was resolved, so it's already in use.
1223 Ok(_) => Ok(false),
1224 Err(error) => {
1225 match error.client_api_error_kind() {
1226 // The room alias wasn't found, so it's available.
1227 Some(ErrorKind::NotFound) => Ok(true),
1228 _ => Err(error),
1229 }
1230 }
1231 }
1232 }
1233
1234 /// Adds a new room alias associated with a room to the room directory.
1235 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1236 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1237 self.send(request).await?;
1238 Ok(())
1239 }
1240
1241 /// Removes a room alias from the room directory.
1242 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1243 let request = delete_alias::v3::Request::new(alias.to_owned());
1244 self.send(request).await?;
1245 Ok(())
1246 }
1247
1248 /// Update the homeserver from the login response well-known if needed.
1249 ///
1250 /// # Arguments
1251 ///
1252 /// * `login_well_known` - The `well_known` field from a successful login
1253 /// response.
1254 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1255 if self.inner.respect_login_well_known {
1256 if let Some(well_known) = login_well_known {
1257 if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1258 self.set_homeserver(homeserver);
1259 }
1260 }
1261 }
1262 }
1263
1264 /// Similar to [`Client::restore_session_with`], with
1265 /// [`RoomLoadSettings::default()`].
1266 ///
1267 /// # Panics
1268 ///
1269 /// Panics if a session was already restored or logged in.
1270 #[instrument(skip_all)]
1271 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1272 self.restore_session_with(session, RoomLoadSettings::default()).await
1273 }
1274
1275 /// Restore a session previously logged-in using one of the available
1276 /// authentication APIs. The number of rooms to restore is controlled by
1277 /// [`RoomLoadSettings`].
1278 ///
1279 /// See the documentation of the corresponding authentication API's
1280 /// `restore_session` method for more information.
1281 ///
1282 /// # Panics
1283 ///
1284 /// Panics if a session was already restored or logged in.
1285 #[instrument(skip_all)]
1286 pub async fn restore_session_with(
1287 &self,
1288 session: impl Into<AuthSession>,
1289 room_load_settings: RoomLoadSettings,
1290 ) -> Result<()> {
1291 let session = session.into();
1292 match session {
1293 AuthSession::Matrix(session) => {
1294 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1295 }
1296 AuthSession::OAuth(session) => {
1297 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1298 }
1299 }
1300 }
1301
1302 /// Refresh the access token using the authentication API used to log into
1303 /// this session.
1304 ///
1305 /// See the documentation of the authentication API's `refresh_access_token`
1306 /// method for more information.
1307 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1308 let Some(auth_api) = self.auth_api() else {
1309 return Err(RefreshTokenError::RefreshTokenRequired);
1310 };
1311
1312 match auth_api {
1313 AuthApi::Matrix(api) => {
1314 trace!("Token refresh: Using the homeserver.");
1315 Box::pin(api.refresh_access_token()).await?;
1316 }
1317 AuthApi::OAuth(api) => {
1318 trace!("Token refresh: Using OAuth 2.0.");
1319 Box::pin(api.refresh_access_token()).await?;
1320 }
1321 }
1322
1323 Ok(())
1324 }
1325
1326 /// Get or upload a sync filter.
1327 ///
1328 /// This method will either get a filter ID from the store or upload the
1329 /// filter definition to the homeserver and return the new filter ID.
1330 ///
1331 /// # Arguments
1332 ///
1333 /// * `filter_name` - The unique name of the filter, this name will be used
1334 /// locally to store and identify the filter ID returned by the server.
1335 ///
1336 /// * `definition` - The filter definition that should be uploaded to the
1337 /// server if no filter ID can be found in the store.
1338 ///
1339 /// # Examples
1340 ///
1341 /// ```no_run
1342 /// # use matrix_sdk::{
1343 /// # Client, config::SyncSettings,
1344 /// # ruma::api::client::{
1345 /// # filter::{
1346 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1347 /// # },
1348 /// # sync::sync_events::v3::Filter,
1349 /// # }
1350 /// # };
1351 /// # use url::Url;
1352 /// # async {
1353 /// # let homeserver = Url::parse("http://example.com").unwrap();
1354 /// # let client = Client::new(homeserver).await.unwrap();
1355 /// let mut filter = FilterDefinition::default();
1356 ///
1357 /// // Let's enable member lazy loading.
1358 /// filter.room.state.lazy_load_options =
1359 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1360 ///
1361 /// let filter_id = client
1362 /// .get_or_upload_filter("sync", filter)
1363 /// .await
1364 /// .unwrap();
1365 ///
1366 /// let sync_settings = SyncSettings::new()
1367 /// .filter(Filter::FilterId(filter_id));
1368 ///
1369 /// let response = client.sync_once(sync_settings).await.unwrap();
1370 /// # };
1371 #[instrument(skip(self, definition))]
1372 pub async fn get_or_upload_filter(
1373 &self,
1374 filter_name: &str,
1375 definition: FilterDefinition,
1376 ) -> Result<String> {
1377 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1378 debug!("Found filter locally");
1379 Ok(filter)
1380 } else {
1381 debug!("Didn't find filter locally");
1382 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1383 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1384 let response = self.send(request).await?;
1385
1386 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1387
1388 Ok(response.filter_id)
1389 }
1390 }
1391
1392 /// Join a room by `RoomId`.
1393 ///
1394 /// Returns a `join_room_by_id::Response` consisting of the
1395 /// joined rooms `RoomId`.
1396 ///
1397 /// # Arguments
1398 ///
1399 /// * `room_id` - The `RoomId` of the room to be joined.
1400 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1401 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1402 let response = self.send(request).await?;
1403 let base_room = self.base_client().room_joined(&response.room_id).await?;
1404 Ok(Room::new(self.clone(), base_room))
1405 }
1406
1407 /// Join a room by `RoomId`.
1408 ///
1409 /// Returns a `join_room_by_id_or_alias::Response` consisting of the
1410 /// joined rooms `RoomId`.
1411 ///
1412 /// # Arguments
1413 ///
1414 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1415 /// alias looks like `#name:example.com`.
1416 pub async fn join_room_by_id_or_alias(
1417 &self,
1418 alias: &RoomOrAliasId,
1419 server_names: &[OwnedServerName],
1420 ) -> Result<Room> {
1421 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1422 via: server_names.to_owned(),
1423 });
1424 let response = self.send(request).await?;
1425 let base_room = self.base_client().room_joined(&response.room_id).await?;
1426 Ok(Room::new(self.clone(), base_room))
1427 }
1428
1429 /// Search the homeserver's directory of public rooms.
1430 ///
1431 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1432 /// a `get_public_rooms::Response`.
1433 ///
1434 /// # Arguments
1435 ///
1436 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1437 ///
1438 /// * `since` - Pagination token from a previous request.
1439 ///
1440 /// * `server` - The name of the server, if `None` the requested server is
1441 /// used.
1442 ///
1443 /// # Examples
1444 /// ```no_run
1445 /// use matrix_sdk::Client;
1446 /// # use url::Url;
1447 /// # let homeserver = Url::parse("http://example.com").unwrap();
1448 /// # let limit = Some(10);
1449 /// # let since = Some("since token");
1450 /// # let server = Some("servername.com".try_into().unwrap());
1451 /// # async {
1452 /// let mut client = Client::new(homeserver).await.unwrap();
1453 ///
1454 /// client.public_rooms(limit, since, server).await;
1455 /// # };
1456 /// ```
1457 #[cfg_attr(not(target_arch = "wasm32"), deny(clippy::future_not_send))]
1458 pub async fn public_rooms(
1459 &self,
1460 limit: Option<u32>,
1461 since: Option<&str>,
1462 server: Option<&ServerName>,
1463 ) -> HttpResult<get_public_rooms::v3::Response> {
1464 let limit = limit.map(UInt::from);
1465
1466 let request = assign!(get_public_rooms::v3::Request::new(), {
1467 limit,
1468 since: since.map(ToOwned::to_owned),
1469 server: server.map(ToOwned::to_owned),
1470 });
1471 self.send(request).await
1472 }
1473
1474 /// Create a room with the given parameters.
1475 ///
1476 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1477 /// created room.
1478 ///
1479 /// If you want to create a direct message with one specific user, you can
1480 /// use [`create_dm`][Self::create_dm], which is more convenient than
1481 /// assembling the [`create_room::v3::Request`] yourself.
1482 ///
1483 /// If the `is_direct` field of the request is set to `true` and at least
1484 /// one user is invited, the room will be automatically added to the direct
1485 /// rooms in the account data.
1486 ///
1487 /// # Examples
1488 ///
1489 /// ```no_run
1490 /// use matrix_sdk::{
1491 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1492 /// Client,
1493 /// };
1494 /// # use url::Url;
1495 /// #
1496 /// # async {
1497 /// # let homeserver = Url::parse("http://example.com").unwrap();
1498 /// let request = CreateRoomRequest::new();
1499 /// let client = Client::new(homeserver).await.unwrap();
1500 /// assert!(client.create_room(request).await.is_ok());
1501 /// # };
1502 /// ```
1503 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1504 let invite = request.invite.clone();
1505 let is_direct_room = request.is_direct;
1506 let response = self.send(request).await?;
1507
1508 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1509
1510 let joined_room = Room::new(self.clone(), base_room);
1511
1512 if is_direct_room && !invite.is_empty() {
1513 if let Err(error) =
1514 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1515 {
1516 // FIXME: Retry in the background
1517 error!("Failed to mark room as DM: {error}");
1518 }
1519 }
1520
1521 Ok(joined_room)
1522 }
1523
1524 /// Create a DM room.
1525 ///
1526 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1527 /// given user being invited, the room marked `is_direct` and both the
1528 /// creator and invitee getting the default maximum power level.
1529 ///
1530 /// If the `e2e-encryption` feature is enabled, the room will also be
1531 /// encrypted.
1532 ///
1533 /// # Arguments
1534 ///
1535 /// * `user_id` - The ID of the user to create a DM for.
1536 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1537 #[cfg(feature = "e2e-encryption")]
1538 let initial_state =
1539 vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1540 .to_raw_any()];
1541
1542 #[cfg(not(feature = "e2e-encryption"))]
1543 let initial_state = vec![];
1544
1545 let request = assign!(create_room::v3::Request::new(), {
1546 invite: vec![user_id.to_owned()],
1547 is_direct: true,
1548 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1549 initial_state,
1550 });
1551
1552 self.create_room(request).await
1553 }
1554
1555 /// Search the homeserver's directory for public rooms with a filter.
1556 ///
1557 /// # Arguments
1558 ///
1559 /// * `room_search` - The easiest way to create this request is using the
1560 /// `get_public_rooms_filtered::Request` itself.
1561 ///
1562 /// # Examples
1563 ///
1564 /// ```no_run
1565 /// # use url::Url;
1566 /// # use matrix_sdk::Client;
1567 /// # async {
1568 /// # let homeserver = Url::parse("http://example.com")?;
1569 /// use matrix_sdk::ruma::{
1570 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1571 /// };
1572 /// # let mut client = Client::new(homeserver).await?;
1573 ///
1574 /// let mut filter = Filter::new();
1575 /// filter.generic_search_term = Some("rust".to_owned());
1576 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1577 /// request.filter = filter;
1578 ///
1579 /// let response = client.public_rooms_filtered(request).await?;
1580 ///
1581 /// for room in response.chunk {
1582 /// println!("Found room {room:?}");
1583 /// }
1584 /// # anyhow::Ok(()) };
1585 /// ```
1586 pub async fn public_rooms_filtered(
1587 &self,
1588 request: get_public_rooms_filtered::v3::Request,
1589 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1590 self.send(request).await
1591 }
1592
1593 /// Send an arbitrary request to the server, without updating client state.
1594 ///
1595 /// **Warning:** Because this method *does not* update the client state, it
1596 /// is important to make sure that you account for this yourself, and
1597 /// use wrapper methods where available. This method should *only* be
1598 /// used if a wrapper method for the endpoint you'd like to use is not
1599 /// available.
1600 ///
1601 /// # Arguments
1602 ///
1603 /// * `request` - A filled out and valid request for the endpoint to be hit
1604 ///
1605 /// * `timeout` - An optional request timeout setting, this overrides the
1606 /// default request setting if one was set.
1607 ///
1608 /// # Examples
1609 ///
1610 /// ```no_run
1611 /// # use matrix_sdk::{Client, config::SyncSettings};
1612 /// # use url::Url;
1613 /// # async {
1614 /// # let homeserver = Url::parse("http://localhost:8080")?;
1615 /// # let mut client = Client::new(homeserver).await?;
1616 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1617 ///
1618 /// // First construct the request you want to make
1619 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1620 /// // for all available Endpoints
1621 /// let user_id = user_id!("@example:localhost").to_owned();
1622 /// let request = profile::get_profile::v3::Request::new(user_id);
1623 ///
1624 /// // Start the request using Client::send()
1625 /// let response = client.send(request).await?;
1626 ///
1627 /// // Check the corresponding Response struct to find out what types are
1628 /// // returned
1629 /// # anyhow::Ok(()) };
1630 /// ```
1631 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1632 where
1633 Request: OutgoingRequest + Clone + Debug,
1634 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1635 {
1636 SendRequest {
1637 client: self.clone(),
1638 request,
1639 config: None,
1640 send_progress: Default::default(),
1641 }
1642 }
1643
1644 pub(crate) async fn send_inner<Request>(
1645 &self,
1646 request: Request,
1647 config: Option<RequestConfig>,
1648 send_progress: SharedObservable<TransmissionProgress>,
1649 ) -> HttpResult<Request::IncomingResponse>
1650 where
1651 Request: OutgoingRequest + Debug,
1652 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1653 {
1654 let homeserver = self.homeserver().to_string();
1655 let access_token = self.access_token();
1656
1657 self.inner
1658 .http_client
1659 .send(
1660 request,
1661 config,
1662 homeserver,
1663 access_token.as_deref(),
1664 &self.server_versions().await?,
1665 send_progress,
1666 )
1667 .await
1668 }
1669
1670 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1671 _ = self
1672 .inner
1673 .auth_ctx
1674 .session_change_sender
1675 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1676 }
1677
1678 /// Fetches server capabilities from network; no caching.
1679 pub async fn fetch_server_capabilities(
1680 &self,
1681 request_config: Option<RequestConfig>,
1682 ) -> HttpResult<(Box<[MatrixVersion]>, BTreeMap<String, bool>)> {
1683 let resp = self
1684 .inner
1685 .http_client
1686 .send(
1687 get_supported_versions::Request::new(),
1688 request_config,
1689 self.homeserver().to_string(),
1690 None,
1691 &[MatrixVersion::V1_0],
1692 Default::default(),
1693 )
1694 .await?;
1695
1696 // Fill both unstable features and server versions at once.
1697 let mut versions = resp.known_versions().collect::<Vec<_>>();
1698 if versions.is_empty() {
1699 versions.push(MatrixVersion::V1_0);
1700 }
1701
1702 Ok((versions.into(), resp.unstable_features))
1703 }
1704
1705 /// Load server capabilities from storage, or fetch them from network and
1706 /// cache them.
1707 async fn load_or_fetch_server_capabilities(
1708 &self,
1709 ) -> HttpResult<(Box<[MatrixVersion]>, BTreeMap<String, bool>)> {
1710 match self.state_store().get_kv_data(StateStoreDataKey::ServerCapabilities).await {
1711 Ok(Some(stored)) => {
1712 if let Some((versions, unstable_features)) =
1713 stored.into_server_capabilities().and_then(|cap| cap.maybe_decode())
1714 {
1715 return Ok((versions.into(), unstable_features));
1716 }
1717 }
1718 Ok(None) => {
1719 // fallthrough: cache is empty
1720 }
1721 Err(err) => {
1722 warn!("error when loading cached server capabilities: {err}");
1723 // fallthrough to network.
1724 }
1725 }
1726
1727 let (versions, unstable_features) = self.fetch_server_capabilities(None).await?;
1728
1729 // Attempt to cache the result in storage.
1730 {
1731 let encoded = ServerCapabilities::new(&versions, unstable_features.clone());
1732 if let Err(err) = self
1733 .state_store()
1734 .set_kv_data(
1735 StateStoreDataKey::ServerCapabilities,
1736 StateStoreDataValue::ServerCapabilities(encoded),
1737 )
1738 .await
1739 {
1740 warn!("error when caching server capabilities: {err}");
1741 }
1742 }
1743
1744 Ok((versions, unstable_features))
1745 }
1746
1747 async fn get_or_load_and_cache_server_capabilities<
1748 T,
1749 F: Fn(&ClientServerCapabilities) -> Option<T>,
1750 >(
1751 &self,
1752 f: F,
1753 ) -> HttpResult<T> {
1754 let caps = &self.inner.caches.server_capabilities;
1755 if let Some(val) = f(&*caps.read().await) {
1756 return Ok(val);
1757 }
1758
1759 let mut guard = caps.write().await;
1760 if let Some(val) = f(&guard) {
1761 return Ok(val);
1762 }
1763
1764 let (versions, unstable_features) = self.load_or_fetch_server_capabilities().await?;
1765
1766 guard.server_versions = Some(versions);
1767 guard.unstable_features = Some(unstable_features);
1768
1769 // SAFETY: both fields were set above, so the function will always return some.
1770 Ok(f(&guard).unwrap())
1771 }
1772
1773 /// Get the Matrix versions supported by the homeserver by fetching them
1774 /// from the server or the cache.
1775 ///
1776 /// # Examples
1777 ///
1778 /// ```no_run
1779 /// use ruma::api::MatrixVersion;
1780 /// # use matrix_sdk::{Client, config::SyncSettings};
1781 /// # use url::Url;
1782 /// # async {
1783 /// # let homeserver = Url::parse("http://localhost:8080")?;
1784 /// # let mut client = Client::new(homeserver).await?;
1785 ///
1786 /// let server_versions = client.server_versions().await?;
1787 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
1788 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
1789 /// # anyhow::Ok(()) };
1790 /// ```
1791 pub async fn server_versions(&self) -> HttpResult<Box<[MatrixVersion]>> {
1792 self.get_or_load_and_cache_server_capabilities(|caps| caps.server_versions.clone()).await
1793 }
1794
1795 /// Get the unstable features supported by the homeserver by fetching them
1796 /// from the server or the cache.
1797 ///
1798 /// # Examples
1799 ///
1800 /// ```no_run
1801 /// # use matrix_sdk::{Client, config::SyncSettings};
1802 /// # use url::Url;
1803 /// # async {
1804 /// # let homeserver = Url::parse("http://localhost:8080")?;
1805 /// # let mut client = Client::new(homeserver).await?;
1806 /// let unstable_features = client.unstable_features().await?;
1807 /// let supports_msc_x =
1808 /// unstable_features.get("msc_x").copied().unwrap_or(false);
1809 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
1810 /// # anyhow::Ok(()) };
1811 /// ```
1812 pub async fn unstable_features(&self) -> HttpResult<BTreeMap<String, bool>> {
1813 self.get_or_load_and_cache_server_capabilities(|caps| caps.unstable_features.clone()).await
1814 }
1815
1816 /// Empty the server version and unstable features cache.
1817 ///
1818 /// Since the SDK caches server capabilities (versions and unstable
1819 /// features), it's possible to have a stale entry in the cache. This
1820 /// functions makes it possible to force reset it.
1821 pub async fn reset_server_capabilities(&self) -> Result<()> {
1822 // Empty the in-memory caches.
1823 let mut guard = self.inner.caches.server_capabilities.write().await;
1824 guard.server_versions = None;
1825 guard.unstable_features = None;
1826
1827 // Empty the store cache.
1828 Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerCapabilities).await?)
1829 }
1830
1831 /// Check whether MSC 4028 is enabled on the homeserver.
1832 ///
1833 /// # Examples
1834 ///
1835 /// ```no_run
1836 /// # use matrix_sdk::{Client, config::SyncSettings};
1837 /// # use url::Url;
1838 /// # async {
1839 /// # let homeserver = Url::parse("http://localhost:8080")?;
1840 /// # let mut client = Client::new(homeserver).await?;
1841 /// let msc4028_enabled =
1842 /// client.can_homeserver_push_encrypted_event_to_device().await?;
1843 /// # anyhow::Ok(()) };
1844 /// ```
1845 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
1846 Ok(self.unstable_features().await?.get("org.matrix.msc4028").copied().unwrap_or(false))
1847 }
1848
1849 /// Get information of all our own devices.
1850 ///
1851 /// # Examples
1852 ///
1853 /// ```no_run
1854 /// # use matrix_sdk::{Client, config::SyncSettings};
1855 /// # use url::Url;
1856 /// # async {
1857 /// # let homeserver = Url::parse("http://localhost:8080")?;
1858 /// # let mut client = Client::new(homeserver).await?;
1859 /// let response = client.devices().await?;
1860 ///
1861 /// for device in response.devices {
1862 /// println!(
1863 /// "Device: {} {}",
1864 /// device.device_id,
1865 /// device.display_name.as_deref().unwrap_or("")
1866 /// );
1867 /// }
1868 /// # anyhow::Ok(()) };
1869 /// ```
1870 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
1871 let request = get_devices::v3::Request::new();
1872
1873 self.send(request).await
1874 }
1875
1876 /// Delete the given devices from the server.
1877 ///
1878 /// # Arguments
1879 ///
1880 /// * `devices` - The list of devices that should be deleted from the
1881 /// server.
1882 ///
1883 /// * `auth_data` - This request requires user interactive auth, the first
1884 /// request needs to set this to `None` and will always fail with an
1885 /// `UiaaResponse`. The response will contain information for the
1886 /// interactive auth and the same request needs to be made but this time
1887 /// with some `auth_data` provided.
1888 ///
1889 /// ```no_run
1890 /// # use matrix_sdk::{
1891 /// # ruma::{api::client::uiaa, device_id},
1892 /// # Client, Error, config::SyncSettings,
1893 /// # };
1894 /// # use serde_json::json;
1895 /// # use url::Url;
1896 /// # use std::collections::BTreeMap;
1897 /// # async {
1898 /// # let homeserver = Url::parse("http://localhost:8080")?;
1899 /// # let mut client = Client::new(homeserver).await?;
1900 /// let devices = &[device_id!("DEVICEID").to_owned()];
1901 ///
1902 /// if let Err(e) = client.delete_devices(devices, None).await {
1903 /// if let Some(info) = e.as_uiaa_response() {
1904 /// let mut password = uiaa::Password::new(
1905 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1906 /// "wordpass".to_owned(),
1907 /// );
1908 /// password.session = info.session.clone();
1909 ///
1910 /// client
1911 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
1912 /// .await?;
1913 /// }
1914 /// }
1915 /// # anyhow::Ok(()) };
1916 pub async fn delete_devices(
1917 &self,
1918 devices: &[OwnedDeviceId],
1919 auth_data: Option<uiaa::AuthData>,
1920 ) -> HttpResult<delete_devices::v3::Response> {
1921 let mut request = delete_devices::v3::Request::new(devices.to_owned());
1922 request.auth = auth_data;
1923
1924 self.send(request).await
1925 }
1926
1927 /// Change the display name of a device owned by the current user.
1928 ///
1929 /// Returns a `update_device::Response` which specifies the result
1930 /// of the operation.
1931 ///
1932 /// # Arguments
1933 ///
1934 /// * `device_id` - The ID of the device to change the display name of.
1935 /// * `display_name` - The new display name to set.
1936 pub async fn rename_device(
1937 &self,
1938 device_id: &DeviceId,
1939 display_name: &str,
1940 ) -> HttpResult<update_device::v3::Response> {
1941 let mut request = update_device::v3::Request::new(device_id.to_owned());
1942 request.display_name = Some(display_name.to_owned());
1943
1944 self.send(request).await
1945 }
1946
1947 /// Synchronize the client's state with the latest state on the server.
1948 ///
1949 /// ## Syncing Events
1950 ///
1951 /// Messages or any other type of event need to be periodically fetched from
1952 /// the server, this is achieved by sending a `/sync` request to the server.
1953 ///
1954 /// The first sync is sent out without a [`token`]. The response of the
1955 /// first sync will contain a [`next_batch`] field which should then be
1956 /// used in the subsequent sync calls as the [`token`]. This ensures that we
1957 /// don't receive the same events multiple times.
1958 ///
1959 /// ## Long Polling
1960 ///
1961 /// A sync should in the usual case always be in flight. The
1962 /// [`SyncSettings`] have a [`timeout`] option, which controls how
1963 /// long the server will wait for new events before it will respond.
1964 /// The server will respond immediately if some new events arrive before the
1965 /// timeout has expired. If no changes arrive and the timeout expires an
1966 /// empty sync response will be sent to the client.
1967 ///
1968 /// This method of sending a request that may not receive a response
1969 /// immediately is called long polling.
1970 ///
1971 /// ## Filtering Events
1972 ///
1973 /// The number or type of messages and events that the client should receive
1974 /// from the server can be altered using a [`Filter`].
1975 ///
1976 /// Filters can be non-trivial and, since they will be sent with every sync
1977 /// request, they may take up a bunch of unnecessary bandwidth.
1978 ///
1979 /// Luckily filters can be uploaded to the server and reused using an unique
1980 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
1981 /// method.
1982 ///
1983 /// # Arguments
1984 ///
1985 /// * `sync_settings` - Settings for the sync call, this allows us to set
1986 /// various options to configure the sync:
1987 /// * [`filter`] - To configure which events we receive and which get
1988 /// [filtered] by the server
1989 /// * [`timeout`] - To configure our [long polling] setup.
1990 /// * [`token`] - To tell the server which events we already received
1991 /// and where we wish to continue syncing.
1992 /// * [`full_state`] - To tell the server that we wish to receive all
1993 /// state events, regardless of our configured [`token`].
1994 /// * [`set_presence`] - To tell the server to set the presence and to
1995 /// which state.
1996 ///
1997 /// # Examples
1998 ///
1999 /// ```no_run
2000 /// # use url::Url;
2001 /// # async {
2002 /// # let homeserver = Url::parse("http://localhost:8080")?;
2003 /// # let username = "";
2004 /// # let password = "";
2005 /// use matrix_sdk::{
2006 /// config::SyncSettings,
2007 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2008 /// };
2009 ///
2010 /// let client = Client::new(homeserver).await?;
2011 /// client.matrix_auth().login_username(username, password).send().await?;
2012 ///
2013 /// // Sync once so we receive the client state and old messages.
2014 /// client.sync_once(SyncSettings::default()).await?;
2015 ///
2016 /// // Register our handler so we start responding once we receive a new
2017 /// // event.
2018 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2019 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2020 /// });
2021 ///
2022 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2023 /// // from our `sync_once()` call automatically.
2024 /// client.sync(SyncSettings::default()).await;
2025 /// # anyhow::Ok(()) };
2026 /// ```
2027 ///
2028 /// [`sync`]: #method.sync
2029 /// [`SyncSettings`]: crate::config::SyncSettings
2030 /// [`token`]: crate::config::SyncSettings#method.token
2031 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2032 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2033 /// [`set_presence`]: ruma::presence::PresenceState
2034 /// [`filter`]: crate::config::SyncSettings#method.filter
2035 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2036 /// [`next_batch`]: SyncResponse#structfield.next_batch
2037 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2038 /// [long polling]: #long-polling
2039 /// [filtered]: #filtering-events
2040 #[instrument(skip(self))]
2041 pub async fn sync_once(
2042 &self,
2043 sync_settings: crate::config::SyncSettings,
2044 ) -> Result<SyncResponse> {
2045 // The sync might not return for quite a while due to the timeout.
2046 // We'll see if there's anything crypto related to send out before we
2047 // sync, i.e. if we closed our client after a sync but before the
2048 // crypto requests were sent out.
2049 //
2050 // This will mostly be a no-op.
2051 #[cfg(feature = "e2e-encryption")]
2052 if let Err(e) = self.send_outgoing_requests().await {
2053 error!(error = ?e, "Error while sending outgoing E2EE requests");
2054 }
2055
2056 let request = assign!(sync_events::v3::Request::new(), {
2057 filter: sync_settings.filter.map(|f| *f),
2058 since: sync_settings.token,
2059 full_state: sync_settings.full_state,
2060 set_presence: sync_settings.set_presence,
2061 timeout: sync_settings.timeout,
2062 });
2063 let mut request_config = self.request_config();
2064 if let Some(timeout) = sync_settings.timeout {
2065 request_config.timeout += timeout;
2066 }
2067
2068 let response = self.send(request).with_request_config(request_config).await?;
2069 let next_batch = response.next_batch.clone();
2070 let response = self.process_sync(response).await?;
2071
2072 #[cfg(feature = "e2e-encryption")]
2073 if let Err(e) = self.send_outgoing_requests().await {
2074 error!(error = ?e, "Error while sending outgoing E2EE requests");
2075 }
2076
2077 self.inner.sync_beat.notify(usize::MAX);
2078
2079 Ok(SyncResponse::new(next_batch, response))
2080 }
2081
2082 /// Repeatedly synchronize the client state with the server.
2083 ///
2084 /// This method will only return on error, if cancellation is needed
2085 /// the method should be wrapped in a cancelable task or the
2086 /// [`Client::sync_with_callback`] method can be used or
2087 /// [`Client::sync_with_result_callback`] if you want to handle error
2088 /// cases in the loop, too.
2089 ///
2090 /// This method will internally call [`Client::sync_once`] in a loop.
2091 ///
2092 /// This method can be used with the [`Client::add_event_handler`]
2093 /// method to react to individual events. If you instead wish to handle
2094 /// events in a bulk manner the [`Client::sync_with_callback`],
2095 /// [`Client::sync_with_result_callback`] and
2096 /// [`Client::sync_stream`] methods can be used instead. Those methods
2097 /// repeatedly return the whole sync response.
2098 ///
2099 /// # Arguments
2100 ///
2101 /// * `sync_settings` - Settings for the sync call. *Note* that those
2102 /// settings will be only used for the first sync call. See the argument
2103 /// docs for [`Client::sync_once`] for more info.
2104 ///
2105 /// # Return
2106 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2107 /// up to the user of the API to check the error and decide whether the sync
2108 /// should continue or not.
2109 ///
2110 /// # Examples
2111 ///
2112 /// ```no_run
2113 /// # use url::Url;
2114 /// # async {
2115 /// # let homeserver = Url::parse("http://localhost:8080")?;
2116 /// # let username = "";
2117 /// # let password = "";
2118 /// use matrix_sdk::{
2119 /// config::SyncSettings,
2120 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2121 /// };
2122 ///
2123 /// let client = Client::new(homeserver).await?;
2124 /// client.matrix_auth().login_username(&username, &password).send().await?;
2125 ///
2126 /// // Register our handler so we start responding once we receive a new
2127 /// // event.
2128 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2129 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2130 /// });
2131 ///
2132 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2133 /// // automatically.
2134 /// client.sync(SyncSettings::default()).await?;
2135 /// # anyhow::Ok(()) };
2136 /// ```
2137 ///
2138 /// [argument docs]: #method.sync_once
2139 /// [`sync_with_callback`]: #method.sync_with_callback
2140 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2141 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2142 }
2143
2144 /// Repeatedly call sync to synchronize the client state with the server.
2145 ///
2146 /// # Arguments
2147 ///
2148 /// * `sync_settings` - Settings for the sync call. *Note* that those
2149 /// settings will be only used for the first sync call. See the argument
2150 /// docs for [`Client::sync_once`] for more info.
2151 ///
2152 /// * `callback` - A callback that will be called every time a successful
2153 /// response has been fetched from the server. The callback must return a
2154 /// boolean which signalizes if the method should stop syncing. If the
2155 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2156 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2157 ///
2158 /// # Return
2159 /// The sync runs until an error occurs or the
2160 /// callback indicates that the Loop should stop. If the callback asked for
2161 /// a regular stop, the result will be `Ok(())` otherwise the
2162 /// `Err(Error)` is returned.
2163 ///
2164 /// # Examples
2165 ///
2166 /// The following example demonstrates how to sync forever while sending all
2167 /// the interesting events through a mpsc channel to another thread e.g. a
2168 /// UI thread.
2169 ///
2170 /// ```no_run
2171 /// # use std::time::Duration;
2172 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2173 /// # use url::Url;
2174 /// # async {
2175 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2176 /// # let mut client = Client::new(homeserver).await.unwrap();
2177 ///
2178 /// use tokio::sync::mpsc::channel;
2179 ///
2180 /// let (tx, rx) = channel(100);
2181 ///
2182 /// let sync_channel = &tx;
2183 /// let sync_settings = SyncSettings::new()
2184 /// .timeout(Duration::from_secs(30));
2185 ///
2186 /// client
2187 /// .sync_with_callback(sync_settings, |response| async move {
2188 /// let channel = sync_channel;
2189 /// for (room_id, room) in response.rooms.join {
2190 /// for event in room.timeline.events {
2191 /// channel.send(event).await.unwrap();
2192 /// }
2193 /// }
2194 ///
2195 /// LoopCtrl::Continue
2196 /// })
2197 /// .await;
2198 /// };
2199 /// ```
2200 #[instrument(skip_all)]
2201 pub async fn sync_with_callback<C>(
2202 &self,
2203 sync_settings: crate::config::SyncSettings,
2204 callback: impl Fn(SyncResponse) -> C,
2205 ) -> Result<(), Error>
2206 where
2207 C: Future<Output = LoopCtrl>,
2208 {
2209 self.sync_with_result_callback(sync_settings, |result| async {
2210 Ok(callback(result?).await)
2211 })
2212 .await
2213 }
2214
2215 /// Repeatedly call sync to synchronize the client state with the server.
2216 ///
2217 /// # Arguments
2218 ///
2219 /// * `sync_settings` - Settings for the sync call. *Note* that those
2220 /// settings will be only used for the first sync call. See the argument
2221 /// docs for [`Client::sync_once`] for more info.
2222 ///
2223 /// * `callback` - A callback that will be called every time after a
2224 /// response has been received, failure or not. The callback returns a
2225 /// `Result<LoopCtrl, Error>`, too. When returning
2226 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2227 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2228 /// function returns `Ok(())`. In case the callback can't handle the
2229 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2230 /// which results in the sync ending and the `Err(Error)` being returned.
2231 ///
2232 /// # Return
2233 /// The sync runs until an error occurs that the callback can't handle or
2234 /// the callback indicates that the Loop should stop. If the callback
2235 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2236 /// `Err(Error)` is returned.
2237 ///
2238 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2239 /// this, and are handled first without sending the result to the
2240 /// callback. Only after they have exceeded is the `Result` handed to
2241 /// the callback.
2242 ///
2243 /// # Examples
2244 ///
2245 /// The following example demonstrates how to sync forever while sending all
2246 /// the interesting events through a mpsc channel to another thread e.g. a
2247 /// UI thread.
2248 ///
2249 /// ```no_run
2250 /// # use std::time::Duration;
2251 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2252 /// # use url::Url;
2253 /// # async {
2254 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2255 /// # let mut client = Client::new(homeserver).await.unwrap();
2256 /// #
2257 /// use tokio::sync::mpsc::channel;
2258 ///
2259 /// let (tx, rx) = channel(100);
2260 ///
2261 /// let sync_channel = &tx;
2262 /// let sync_settings = SyncSettings::new()
2263 /// .timeout(Duration::from_secs(30));
2264 ///
2265 /// client
2266 /// .sync_with_result_callback(sync_settings, |response| async move {
2267 /// let channel = sync_channel;
2268 /// let sync_response = response?;
2269 /// for (room_id, room) in sync_response.rooms.join {
2270 /// for event in room.timeline.events {
2271 /// channel.send(event).await.unwrap();
2272 /// }
2273 /// }
2274 ///
2275 /// Ok(LoopCtrl::Continue)
2276 /// })
2277 /// .await;
2278 /// };
2279 /// ```
2280 #[instrument(skip(self, callback))]
2281 pub async fn sync_with_result_callback<C>(
2282 &self,
2283 mut sync_settings: crate::config::SyncSettings,
2284 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2285 ) -> Result<(), Error>
2286 where
2287 C: Future<Output = Result<LoopCtrl, Error>>,
2288 {
2289 let mut last_sync_time: Option<Instant> = None;
2290
2291 if sync_settings.token.is_none() {
2292 sync_settings.token = self.sync_token().await;
2293 }
2294
2295 loop {
2296 trace!("Syncing");
2297 let result = self.sync_loop_helper(&mut sync_settings).await;
2298
2299 trace!("Running callback");
2300 if callback(result).await? == LoopCtrl::Break {
2301 trace!("Callback told us to stop");
2302 break;
2303 }
2304 trace!("Done running callback");
2305
2306 Client::delay_sync(&mut last_sync_time).await
2307 }
2308
2309 Ok(())
2310 }
2311
2312 //// Repeatedly synchronize the client state with the server.
2313 ///
2314 /// This method will internally call [`Client::sync_once`] in a loop and is
2315 /// equivalent to the [`Client::sync`] method but the responses are provided
2316 /// as an async stream.
2317 ///
2318 /// # Arguments
2319 ///
2320 /// * `sync_settings` - Settings for the sync call. *Note* that those
2321 /// settings will be only used for the first sync call. See the argument
2322 /// docs for [`Client::sync_once`] for more info.
2323 ///
2324 /// # Examples
2325 ///
2326 /// ```no_run
2327 /// # use url::Url;
2328 /// # async {
2329 /// # let homeserver = Url::parse("http://localhost:8080")?;
2330 /// # let username = "";
2331 /// # let password = "";
2332 /// use futures_util::StreamExt;
2333 /// use matrix_sdk::{config::SyncSettings, Client};
2334 ///
2335 /// let client = Client::new(homeserver).await?;
2336 /// client.matrix_auth().login_username(&username, &password).send().await?;
2337 ///
2338 /// let mut sync_stream =
2339 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2340 ///
2341 /// while let Some(Ok(response)) = sync_stream.next().await {
2342 /// for room in response.rooms.join.values() {
2343 /// for e in &room.timeline.events {
2344 /// if let Ok(event) = e.raw().deserialize() {
2345 /// println!("Received event {:?}", event);
2346 /// }
2347 /// }
2348 /// }
2349 /// }
2350 ///
2351 /// # anyhow::Ok(()) };
2352 /// ```
2353 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2354 #[instrument(skip(self))]
2355 pub async fn sync_stream(
2356 &self,
2357 mut sync_settings: crate::config::SyncSettings,
2358 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2359 let mut last_sync_time: Option<Instant> = None;
2360
2361 if sync_settings.token.is_none() {
2362 sync_settings.token = self.sync_token().await;
2363 }
2364
2365 let parent_span = Span::current();
2366
2367 async_stream::stream! {
2368 loop {
2369 yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;
2370
2371 Client::delay_sync(&mut last_sync_time).await
2372 }
2373 }
2374 }
2375
2376 /// Get the current, if any, sync token of the client.
2377 /// This will be None if the client didn't sync at least once.
2378 pub(crate) async fn sync_token(&self) -> Option<String> {
2379 self.inner.base_client.sync_token().await
2380 }
2381
2382 /// Gets information about the owner of a given access token.
2383 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2384 let request = whoami::v3::Request::new();
2385 self.send(request).await
2386 }
2387
2388 /// Subscribes a new receiver to client SessionChange broadcasts.
2389 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2390 let broadcast = &self.auth_ctx().session_change_sender;
2391 broadcast.subscribe()
2392 }
2393
2394 /// Sets the save/restore session callbacks.
2395 ///
2396 /// This is another mechanism to get synchronous updates to session tokens,
2397 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2398 pub fn set_session_callbacks(
2399 &self,
2400 reload_session_callback: Box<ReloadSessionCallback>,
2401 save_session_callback: Box<SaveSessionCallback>,
2402 ) -> Result<()> {
2403 self.inner
2404 .auth_ctx
2405 .reload_session_callback
2406 .set(reload_session_callback)
2407 .map_err(|_| Error::MultipleSessionCallbacks)?;
2408
2409 self.inner
2410 .auth_ctx
2411 .save_session_callback
2412 .set(save_session_callback)
2413 .map_err(|_| Error::MultipleSessionCallbacks)?;
2414
2415 Ok(())
2416 }
2417
2418 /// Get the notification settings of the current owner of the client.
2419 pub async fn notification_settings(&self) -> NotificationSettings {
2420 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2421 NotificationSettings::new(self.clone(), ruleset)
2422 }
2423
2424 /// Create a new specialized `Client` that can process notifications.
2425 ///
2426 /// See [`CrossProcessStoreLock::new`] to learn more about
2427 /// `cross_process_store_locks_holder_name`.
2428 ///
2429 /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2430 pub async fn notification_client(
2431 &self,
2432 cross_process_store_locks_holder_name: String,
2433 ) -> Result<Client> {
2434 let client = Client {
2435 inner: ClientInner::new(
2436 self.inner.auth_ctx.clone(),
2437 self.server().cloned(),
2438 self.homeserver(),
2439 self.sliding_sync_version(),
2440 self.inner.http_client.clone(),
2441 self.inner
2442 .base_client
2443 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2444 .await?,
2445 self.inner.caches.server_capabilities.read().await.clone(),
2446 self.inner.respect_login_well_known,
2447 self.inner.event_cache.clone(),
2448 self.inner.send_queue_data.clone(),
2449 #[cfg(feature = "e2e-encryption")]
2450 self.inner.e2ee.encryption_settings,
2451 cross_process_store_locks_holder_name,
2452 )
2453 .await,
2454 };
2455
2456 Ok(client)
2457 }
2458
2459 /// The [`EventCache`] instance for this [`Client`].
2460 pub fn event_cache(&self) -> &EventCache {
2461 // SAFETY: always initialized in the `Client` ctor.
2462 self.inner.event_cache.get().unwrap()
2463 }
2464
2465 /// Waits until an at least partially synced room is received, and returns
2466 /// it.
2467 ///
2468 /// **Note: this function will loop endlessly until either it finds the room
2469 /// or an externally set timeout happens.**
2470 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2471 loop {
2472 if let Some(room) = self.get_room(room_id) {
2473 if room.is_state_partially_or_fully_synced() {
2474 debug!("Found just created room!");
2475 return room;
2476 }
2477 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2478 } else {
2479 debug!("Room wasn't found, waiting for sync beat to try again");
2480 }
2481 self.inner.sync_beat.listen().await;
2482 }
2483 }
2484
2485 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2486 /// join it.
2487 pub async fn knock(
2488 &self,
2489 room_id_or_alias: OwnedRoomOrAliasId,
2490 reason: Option<String>,
2491 server_names: Vec<OwnedServerName>,
2492 ) -> Result<Room> {
2493 let request =
2494 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2495 let response = self.send(request).await?;
2496 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2497 Ok(Room::new(self.clone(), base_room))
2498 }
2499}
2500
2501/// A weak reference to the inner client, useful when trying to get a handle
2502/// on the owning client.
2503#[derive(Clone)]
2504pub(crate) struct WeakClient {
2505 client: Weak<ClientInner>,
2506}
2507
2508impl WeakClient {
2509 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2510 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2511 Self { client: Arc::downgrade(client) }
2512 }
2513
2514 /// Construct a [`WeakClient`] from a [`Client`].
2515 pub fn from_client(client: &Client) -> Self {
2516 Self::from_inner(&client.inner)
2517 }
2518
2519 /// Attempts to get a [`Client`] from this [`WeakClient`].
2520 pub fn get(&self) -> Option<Client> {
2521 self.client.upgrade().map(|inner| Client { inner })
2522 }
2523
2524 /// Gets the number of strong (`Arc`) pointers still pointing to this
2525 /// client.
2526 #[allow(dead_code)]
2527 pub fn strong_count(&self) -> usize {
2528 self.client.strong_count()
2529 }
2530}
2531
2532#[derive(Clone)]
2533struct ClientServerCapabilities {
2534 /// The Matrix versions the server supports (well-known ones only).
2535 server_versions: Option<Box<[MatrixVersion]>>,
2536
2537 /// The unstable features and their on/off state on the server.
2538 unstable_features: Option<BTreeMap<String, bool>>,
2539}
2540
2541// The http mocking library is not supported for wasm32
2542#[cfg(all(test, not(target_arch = "wasm32")))]
2543pub(crate) mod tests {
2544 use std::{sync::Arc, time::Duration};
2545
2546 use assert_matches::assert_matches;
2547 use futures_util::FutureExt;
2548 use matrix_sdk_base::{
2549 store::{MemoryStore, StoreConfig},
2550 RoomState,
2551 };
2552 use matrix_sdk_test::{
2553 async_test, test_json, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder,
2554 DEFAULT_TEST_ROOM_ID,
2555 };
2556 #[cfg(target_arch = "wasm32")]
2557 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2558
2559 use ruma::{
2560 api::{client::room::create_room::v3::Request as CreateRoomRequest, MatrixVersion},
2561 assign,
2562 events::ignored_user_list::IgnoredUserListEventContent,
2563 owned_room_id, room_alias_id, room_id, RoomId, ServerName, UserId,
2564 };
2565 use serde_json::json;
2566 use tokio::{
2567 spawn,
2568 time::{sleep, timeout},
2569 };
2570 use url::Url;
2571 use wiremock::{
2572 matchers::{body_json, header, method, path, query_param_is_missing},
2573 Mock, MockServer, ResponseTemplate,
2574 };
2575
2576 use super::Client;
2577 use crate::{
2578 client::WeakClient,
2579 config::{RequestConfig, SyncSettings},
2580 test_utils::{
2581 logged_in_client, mocks::MatrixMockServer, no_retry_test_client, set_client_session,
2582 test_client_builder, test_client_builder_with_server,
2583 },
2584 Error,
2585 };
2586
2587 #[async_test]
2588 async fn test_account_data() {
2589 let server = MockServer::start().await;
2590 let client = logged_in_client(Some(server.uri())).await;
2591
2592 Mock::given(method("GET"))
2593 .and(path("/_matrix/client/r0/sync".to_owned()))
2594 .and(header("authorization", "Bearer 1234"))
2595 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::SYNC))
2596 .mount(&server)
2597 .await;
2598
2599 let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
2600 let _response = client.sync_once(sync_settings).await.unwrap();
2601
2602 let content = client
2603 .account()
2604 .account_data::<IgnoredUserListEventContent>()
2605 .await
2606 .unwrap()
2607 .unwrap()
2608 .deserialize()
2609 .unwrap();
2610
2611 assert_eq!(content.ignored_users.len(), 1);
2612 }
2613
2614 #[async_test]
2615 async fn test_successful_discovery() {
2616 // Imagine this is `matrix.org`.
2617 let server = MockServer::start().await;
2618
2619 // Imagine this is `matrix-client.matrix.org`.
2620 let homeserver = MockServer::start().await;
2621
2622 // Imagine Alice has the user ID `@alice:matrix.org`.
2623 let server_url = server.uri();
2624 let domain = server_url.strip_prefix("http://").unwrap();
2625 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2626
2627 // The `.well-known` is on the server (e.g. `matrix.org`).
2628 Mock::given(method("GET"))
2629 .and(path("/.well-known/matrix/client"))
2630 .respond_with(ResponseTemplate::new(200).set_body_raw(
2631 test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", &homeserver.uri()),
2632 "application/json",
2633 ))
2634 .mount(&server)
2635 .await;
2636
2637 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
2638 Mock::given(method("GET"))
2639 .and(path("/_matrix/client/versions"))
2640 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
2641 .mount(&homeserver)
2642 .await;
2643
2644 let client = Client::builder()
2645 .insecure_server_name_no_tls(alice.server_name())
2646 .build()
2647 .await
2648 .unwrap();
2649
2650 assert_eq!(client.server().unwrap(), &Url::parse(&server.uri()).unwrap());
2651 assert_eq!(client.homeserver(), Url::parse(&homeserver.uri()).unwrap());
2652 }
2653
2654 #[async_test]
2655 async fn test_discovery_broken_server() {
2656 let server = MockServer::start().await;
2657 let server_url = server.uri();
2658 let domain = server_url.strip_prefix("http://").unwrap();
2659 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2660
2661 Mock::given(method("GET"))
2662 .and(path("/.well-known/matrix/client"))
2663 .respond_with(ResponseTemplate::new(404))
2664 .mount(&server)
2665 .await;
2666
2667 assert!(
2668 Client::builder()
2669 .insecure_server_name_no_tls(alice.server_name())
2670 .build()
2671 .await
2672 .is_err(),
2673 "Creating a client from a user ID should fail when the .well-known request fails."
2674 );
2675 }
2676
2677 #[async_test]
2678 async fn test_room_creation() {
2679 let server = MockServer::start().await;
2680 let client = logged_in_client(Some(server.uri())).await;
2681
2682 let response = SyncResponseBuilder::default()
2683 .add_joined_room(
2684 JoinedRoomBuilder::default()
2685 .add_state_event(StateTestEvent::Member)
2686 .add_state_event(StateTestEvent::PowerLevels),
2687 )
2688 .build_sync_response();
2689
2690 client.inner.base_client.receive_sync_response(response).await.unwrap();
2691
2692 assert_eq!(client.homeserver(), Url::parse(&server.uri()).unwrap());
2693
2694 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
2695 assert_eq!(room.state(), RoomState::Joined);
2696 }
2697
2698 #[async_test]
2699 async fn test_retry_limit_http_requests() {
2700 let server = MockServer::start().await;
2701 let client = test_client_builder(Some(server.uri()))
2702 .request_config(RequestConfig::new().retry_limit(3))
2703 .build()
2704 .await
2705 .unwrap();
2706
2707 assert!(client.request_config().retry_limit.unwrap() == 3);
2708
2709 Mock::given(method("POST"))
2710 .and(path("/_matrix/client/r0/login"))
2711 .respond_with(ResponseTemplate::new(501))
2712 .expect(3)
2713 .mount(&server)
2714 .await;
2715
2716 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2717 }
2718
2719 #[async_test]
2720 async fn test_retry_timeout_http_requests() {
2721 // Keep this timeout small so that the test doesn't take long
2722 let retry_timeout = Duration::from_secs(5);
2723 let server = MockServer::start().await;
2724 let client = test_client_builder(Some(server.uri()))
2725 .request_config(RequestConfig::new().retry_timeout(retry_timeout))
2726 .build()
2727 .await
2728 .unwrap();
2729
2730 assert!(client.request_config().retry_timeout.unwrap() == retry_timeout);
2731
2732 Mock::given(method("POST"))
2733 .and(path("/_matrix/client/r0/login"))
2734 .respond_with(ResponseTemplate::new(501))
2735 .expect(2..)
2736 .mount(&server)
2737 .await;
2738
2739 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2740 }
2741
2742 #[async_test]
2743 async fn test_short_retry_initial_http_requests() {
2744 let server = MockServer::start().await;
2745 let client = test_client_builder(Some(server.uri())).build().await.unwrap();
2746
2747 Mock::given(method("POST"))
2748 .and(path("/_matrix/client/r0/login"))
2749 .respond_with(ResponseTemplate::new(501))
2750 .expect(3..)
2751 .mount(&server)
2752 .await;
2753
2754 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2755 }
2756
2757 #[async_test]
2758 async fn test_no_retry_http_requests() {
2759 let server = MockServer::start().await;
2760 let client = logged_in_client(Some(server.uri())).await;
2761
2762 Mock::given(method("GET"))
2763 .and(path("/_matrix/client/r0/devices"))
2764 .respond_with(ResponseTemplate::new(501))
2765 .expect(1)
2766 .mount(&server)
2767 .await;
2768
2769 client.devices().await.unwrap_err();
2770 }
2771
2772 #[async_test]
2773 async fn test_set_homeserver() {
2774 let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2775 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
2776
2777 let homeserver = Url::parse("http://example.com/").unwrap();
2778 client.set_homeserver(homeserver.clone());
2779 assert_eq!(client.homeserver(), homeserver);
2780 }
2781
2782 #[async_test]
2783 async fn test_search_user_request() {
2784 let server = MockServer::start().await;
2785 let client = logged_in_client(Some(server.uri())).await;
2786
2787 Mock::given(method("POST"))
2788 .and(path("_matrix/client/r0/user_directory/search"))
2789 .and(body_json(&*test_json::search_users::SEARCH_USERS_REQUEST))
2790 .respond_with(
2791 ResponseTemplate::new(200)
2792 .set_body_json(&*test_json::search_users::SEARCH_USERS_RESPONSE),
2793 )
2794 .mount(&server)
2795 .await;
2796
2797 let response = client.search_users("test", 50).await.unwrap();
2798 assert_eq!(response.results.len(), 1);
2799 let result = &response.results[0];
2800 assert_eq!(result.user_id.to_string(), "@test:example.me");
2801 assert_eq!(result.display_name.clone().unwrap(), "Test");
2802 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
2803 assert!(!response.limited);
2804 }
2805
2806 #[async_test]
2807 async fn test_request_unstable_features() {
2808 let server = MockServer::start().await;
2809 let client = logged_in_client(Some(server.uri())).await;
2810
2811 Mock::given(method("GET"))
2812 .and(path("_matrix/client/versions"))
2813 .respond_with(
2814 ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
2815 )
2816 .mount(&server)
2817 .await;
2818
2819 let unstable_features = client.unstable_features().await.unwrap();
2820 assert_eq!(unstable_features.get("org.matrix.e2e_cross_signing"), Some(&true));
2821 assert_eq!(unstable_features.get("you.shall.pass"), None);
2822 }
2823
2824 #[async_test]
2825 async fn test_can_homeserver_push_encrypted_event_to_device() {
2826 let server = MockServer::start().await;
2827 let client = logged_in_client(Some(server.uri())).await;
2828
2829 Mock::given(method("GET"))
2830 .and(path("_matrix/client/versions"))
2831 .respond_with(
2832 ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
2833 )
2834 .mount(&server)
2835 .await;
2836
2837 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
2838 assert!(msc4028_enabled);
2839 }
2840
2841 #[async_test]
2842 async fn test_recently_visited_rooms() {
2843 // Tracking recently visited rooms requires authentication
2844 let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2845 assert_matches!(
2846 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
2847 Err(Error::AuthenticationRequired)
2848 );
2849
2850 let client = logged_in_client(None).await;
2851 let account = client.account();
2852
2853 // We should start off with an empty list
2854 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
2855
2856 // Tracking a valid room id should add it to the list
2857 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2858 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2859 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2860
2861 // And the existing list shouldn't be changed
2862 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2863 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2864
2865 // Tracking the same room again shouldn't change the list
2866 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2867 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2868 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2869
2870 // Tracking a second room should add it to the front of the list
2871 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
2872 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
2873 assert_eq!(
2874 account.get_recently_visited_rooms().await.unwrap(),
2875 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
2876 );
2877
2878 // Tracking the first room yet again should move it to the front of the list
2879 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2880 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
2881 assert_eq!(
2882 account.get_recently_visited_rooms().await.unwrap(),
2883 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
2884 );
2885
2886 // Tracking should be capped at 20
2887 for n in 0..20 {
2888 account
2889 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
2890 .await
2891 .unwrap();
2892 }
2893
2894 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
2895
2896 // And the initial rooms should've been pushed out
2897 let rooms = account.get_recently_visited_rooms().await.unwrap();
2898 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
2899 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
2900
2901 // And the last tracked room should be the first
2902 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
2903 }
2904
2905 #[async_test]
2906 async fn test_client_no_cycle_with_event_cache() {
2907 let client = logged_in_client(None).await;
2908
2909 // Wait for the init tasks to die.
2910 sleep(Duration::from_secs(1)).await;
2911
2912 let weak_client = WeakClient::from_client(&client);
2913 assert_eq!(weak_client.strong_count(), 1);
2914
2915 {
2916 let room_id = room_id!("!room:example.org");
2917
2918 // Have the client know the room.
2919 let response = SyncResponseBuilder::default()
2920 .add_joined_room(JoinedRoomBuilder::new(room_id))
2921 .build_sync_response();
2922 client.inner.base_client.receive_sync_response(response).await.unwrap();
2923
2924 client.event_cache().subscribe().unwrap();
2925
2926 let (_room_event_cache, _drop_handles) =
2927 client.get_room(room_id).unwrap().event_cache().await.unwrap();
2928 }
2929
2930 drop(client);
2931
2932 // Give a bit of time for background tasks to die.
2933 sleep(Duration::from_secs(1)).await;
2934
2935 // The weak client must be the last reference to the client now.
2936 assert_eq!(weak_client.strong_count(), 0);
2937 let client = weak_client.get();
2938 assert!(
2939 client.is_none(),
2940 "too many strong references to the client: {}",
2941 Arc::strong_count(&client.unwrap().inner)
2942 );
2943 }
2944
2945 #[async_test]
2946 async fn test_server_capabilities_caching() {
2947 let server = MockServer::start().await;
2948 let server_url = server.uri();
2949 let domain = server_url.strip_prefix("http://").unwrap();
2950 let server_name = <&ServerName>::try_from(domain).unwrap();
2951
2952 Mock::given(method("GET"))
2953 .and(path("/.well-known/matrix/client"))
2954 .respond_with(ResponseTemplate::new(200).set_body_raw(
2955 test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", server_url.as_ref()),
2956 "application/json",
2957 ))
2958 .named("well known mock")
2959 .expect(2)
2960 .mount(&server)
2961 .await;
2962
2963 let versions_mock = Mock::given(method("GET"))
2964 .and(path("/_matrix/client/versions"))
2965 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
2966 .named("first versions mock")
2967 .expect(1)
2968 .mount_as_scoped(&server)
2969 .await;
2970
2971 let memory_store = Arc::new(MemoryStore::new());
2972 let client = Client::builder()
2973 .insecure_server_name_no_tls(server_name)
2974 .store_config(
2975 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
2976 .state_store(memory_store.clone()),
2977 )
2978 .build()
2979 .await
2980 .unwrap();
2981
2982 assert_eq!(client.server_versions().await.unwrap().len(), 1);
2983
2984 // This second call hits the in-memory cache.
2985 assert!(client
2986 .server_versions()
2987 .await
2988 .unwrap()
2989 .iter()
2990 .any(|version| *version == MatrixVersion::V1_0));
2991
2992 drop(client);
2993
2994 let client = Client::builder()
2995 .insecure_server_name_no_tls(server_name)
2996 .store_config(
2997 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
2998 .state_store(memory_store.clone()),
2999 )
3000 .build()
3001 .await
3002 .unwrap();
3003
3004 // This third call hits the on-disk cache.
3005 assert_eq!(
3006 client.unstable_features().await.unwrap().get("org.matrix.e2e_cross_signing"),
3007 Some(&true)
3008 );
3009
3010 drop(versions_mock);
3011 server.verify().await;
3012
3013 // Now, reset the cache, and observe the endpoint being called again once.
3014 client.reset_server_capabilities().await.unwrap();
3015
3016 Mock::given(method("GET"))
3017 .and(path("/_matrix/client/versions"))
3018 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3019 .expect(1)
3020 .named("second versions mock")
3021 .mount(&server)
3022 .await;
3023
3024 // Hits network again.
3025 assert_eq!(client.server_versions().await.unwrap().len(), 1);
3026 // Hits in-memory cache again.
3027 assert!(client
3028 .server_versions()
3029 .await
3030 .unwrap()
3031 .iter()
3032 .any(|version| *version == MatrixVersion::V1_0));
3033 }
3034
3035 #[async_test]
3036 async fn test_no_network_doesnt_cause_infinite_retries() {
3037 // Note: not `no_retry_test_client` or `logged_in_client` which uses the former,
3038 // since we want infinite retries for transient errors.
3039 let client =
3040 test_client_builder(None).request_config(RequestConfig::new()).build().await.unwrap();
3041 set_client_session(&client).await;
3042
3043 // We don't define a mock server on purpose here, so that the error is really a
3044 // network error.
3045 client.whoami().await.unwrap_err();
3046 }
3047
3048 #[async_test]
3049 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3050 let (client_builder, server) = test_client_builder_with_server().await;
3051 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3052 set_client_session(&client).await;
3053
3054 let builder = Mock::given(method("GET"))
3055 .and(path("/_matrix/client/r0/sync"))
3056 .and(header("authorization", "Bearer 1234"))
3057 .and(query_param_is_missing("since"));
3058
3059 let room_id = room_id!("!room:example.org");
3060 let joined_room_builder = JoinedRoomBuilder::new(room_id);
3061 let mut sync_response_builder = SyncResponseBuilder::new();
3062 sync_response_builder.add_joined_room(joined_room_builder);
3063 let response_body = sync_response_builder.build_json_sync_response();
3064
3065 builder
3066 .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3067 .mount(&server)
3068 .await;
3069
3070 client.sync_once(SyncSettings::default()).await.unwrap();
3071
3072 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3073 assert_eq!(room.room_id(), room_id);
3074 }
3075
3076 #[async_test]
3077 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3078 let (client_builder, server) = test_client_builder_with_server().await;
3079 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3080 set_client_session(&client).await;
3081
3082 let builder = Mock::given(method("GET"))
3083 .and(path("/_matrix/client/r0/sync"))
3084 .and(header("authorization", "Bearer 1234"))
3085 .and(query_param_is_missing("since"));
3086
3087 let room_id = room_id!("!room:example.org");
3088 let joined_room_builder = JoinedRoomBuilder::new(room_id);
3089 let mut sync_response_builder = SyncResponseBuilder::new();
3090 sync_response_builder.add_joined_room(joined_room_builder);
3091 let response_body = sync_response_builder.build_json_sync_response();
3092
3093 builder
3094 .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3095 .mount(&server)
3096 .await;
3097
3098 let client = Arc::new(client);
3099
3100 // Perform the /sync request with a delay so it starts after the
3101 // `await_room_remote_echo` call has happened
3102 spawn({
3103 let client = client.clone();
3104 async move {
3105 sleep(Duration::from_millis(100)).await;
3106 client.sync_once(SyncSettings::default()).await.unwrap();
3107 }
3108 });
3109
3110 let room =
3111 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3112 assert_eq!(room.room_id(), room_id);
3113 }
3114
3115 #[async_test]
3116 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3117 let (client_builder, _) = test_client_builder_with_server().await;
3118 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3119 set_client_session(&client).await;
3120
3121 let room_id = room_id!("!room:example.org");
3122 // Room is not present so the client won't be able to find it. The call will
3123 // timeout.
3124 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3125 }
3126
3127 #[async_test]
3128 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3129 let (client_builder, server) = test_client_builder_with_server().await;
3130 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3131 set_client_session(&client).await;
3132
3133 Mock::given(method("POST"))
3134 .and(path("_matrix/client/r0/createRoom"))
3135 .and(header("authorization", "Bearer 1234"))
3136 .respond_with(
3137 ResponseTemplate::new(200).set_body_json(json!({ "room_id": "!room:example.org"})),
3138 )
3139 .mount(&server)
3140 .await;
3141
3142 // Create a room in the internal store
3143 let room = client
3144 .create_room(assign!(CreateRoomRequest::new(), {
3145 invite: vec![],
3146 is_direct: false,
3147 }))
3148 .await
3149 .unwrap();
3150
3151 // Room is locally present, but not synced, the call will timeout
3152 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3153 .await
3154 .unwrap_err();
3155 }
3156
3157 #[async_test]
3158 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3159 let server = MatrixMockServer::new().await;
3160 let client = server.client_builder().build().await;
3161
3162 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3163
3164 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3165 assert_matches!(ret, Ok(true));
3166 }
3167
3168 #[async_test]
3169 async fn test_is_room_alias_available_if_alias_is_resolved() {
3170 let server = MatrixMockServer::new().await;
3171 let client = server.client_builder().build().await;
3172
3173 server
3174 .mock_room_directory_resolve_alias()
3175 .ok("!some_room_id:matrix.org", Vec::new())
3176 .expect(1)
3177 .mount()
3178 .await;
3179
3180 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3181 assert_matches!(ret, Ok(false));
3182 }
3183
3184 #[async_test]
3185 async fn test_is_room_alias_available_if_error_found() {
3186 let server = MatrixMockServer::new().await;
3187 let client = server.client_builder().build().await;
3188
3189 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3190
3191 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3192 assert_matches!(ret, Err(_));
3193 }
3194
3195 #[async_test]
3196 async fn test_create_room_alias() {
3197 let server = MatrixMockServer::new().await;
3198 let client = server.client_builder().build().await;
3199
3200 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3201
3202 let ret = client
3203 .create_room_alias(
3204 room_alias_id!("#some_alias:matrix.org"),
3205 room_id!("!some_room:matrix.org"),
3206 )
3207 .await;
3208 assert_matches!(ret, Ok(()));
3209 }
3210
3211 #[async_test]
3212 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3213 let server = MatrixMockServer::new().await;
3214 let client = server.client_builder().build().await;
3215
3216 let room_id = room_id!("!a-room:matrix.org");
3217
3218 // Make sure the summary endpoint is called once
3219 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3220
3221 // We create a locally cached invited room
3222 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3223
3224 // And we get a preview, the server endpoint was reached
3225 let preview = client
3226 .get_room_preview(room_id.into(), Vec::new())
3227 .await
3228 .expect("Room preview should be retrieved");
3229
3230 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3231 }
3232
3233 #[async_test]
3234 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3235 let server = MatrixMockServer::new().await;
3236 let client = server.client_builder().build().await;
3237
3238 let room_id = room_id!("!a-room:matrix.org");
3239
3240 // Make sure the summary endpoint is called once
3241 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3242
3243 // We create a locally cached left room
3244 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3245
3246 // And we get a preview, the server endpoint was reached
3247 let preview = client
3248 .get_room_preview(room_id.into(), Vec::new())
3249 .await
3250 .expect("Room preview should be retrieved");
3251
3252 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3253 }
3254
3255 #[async_test]
3256 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3257 let server = MatrixMockServer::new().await;
3258 let client = server.client_builder().build().await;
3259
3260 let room_id = room_id!("!a-room:matrix.org");
3261
3262 // Make sure the summary endpoint is called once
3263 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3264
3265 // We create a locally cached knocked room
3266 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3267
3268 // And we get a preview, the server endpoint was reached
3269 let preview = client
3270 .get_room_preview(room_id.into(), Vec::new())
3271 .await
3272 .expect("Room preview should be retrieved");
3273
3274 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3275 }
3276
3277 #[async_test]
3278 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3279 let server = MatrixMockServer::new().await;
3280 let client = server.client_builder().build().await;
3281
3282 let room_id = room_id!("!a-room:matrix.org");
3283
3284 // Make sure the summary endpoint is not called
3285 server.mock_room_summary().ok(room_id).never().mount().await;
3286
3287 // We create a locally cached joined room
3288 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3289
3290 // And we get a preview, no server endpoint was reached
3291 let preview = client
3292 .get_room_preview(room_id.into(), Vec::new())
3293 .await
3294 .expect("Room preview should be retrieved");
3295
3296 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3297 }
3298
3299 #[async_test]
3300 async fn test_room_preview_for_banned_room_retrieves_local_room_info() {
3301 let server = MatrixMockServer::new().await;
3302 let client = server.client_builder().build().await;
3303
3304 let room_id = room_id!("!a-room:matrix.org");
3305
3306 // Make sure the summary endpoint is not called
3307 server.mock_room_summary().ok(room_id).never().mount().await;
3308
3309 // We create a locally cached banned room
3310 let banned_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Banned);
3311
3312 // And we get a preview, no server endpoint was reached
3313 let preview = client
3314 .get_room_preview(room_id.into(), Vec::new())
3315 .await
3316 .expect("Room preview should be retrieved");
3317
3318 assert_eq!(banned_room.room_id().to_owned(), preview.room_id);
3319 }
3320}