matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
33use matrix_sdk_base::{
34 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
35 StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
36 event_cache::store::EventCacheStoreLock,
37 media::store::MediaStoreLock,
38 store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
39 sync::{Notification, RoomUpdates},
40};
41use matrix_sdk_common::ttl_cache::TtlCache;
42#[cfg(feature = "e2e-encryption")]
43use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
44use ruma::{
45 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
46 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
47 api::{
48 FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
49 auth_scheme::{AuthScheme, SendAccessToken},
50 client::{
51 account::whoami,
52 alias::{create_alias, delete_alias, get_alias},
53 authenticated_media,
54 device::{self, delete_devices, get_devices, update_device},
55 directory::{get_public_rooms, get_public_rooms_filtered},
56 discovery::{
57 discover_homeserver::{self, RtcFocusInfo},
58 get_capabilities::{self, v3::Capabilities},
59 get_supported_versions,
60 },
61 error::ErrorKind,
62 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
63 knock::knock_room,
64 media,
65 membership::{join_room_by_id, join_room_by_id_or_alias},
66 room::create_room,
67 session::login::v3::DiscoveryInfo,
68 sync::sync_events,
69 threads::get_thread_subscriptions_changes,
70 uiaa,
71 user_directory::search_users,
72 },
73 error::FromHttpResponseError,
74 federation::discovery::get_server_version,
75 path_builder::PathBuilder,
76 },
77 assign,
78 events::direct::DirectUserIdentifier,
79 push::Ruleset,
80 time::Instant,
81};
82use serde::de::DeserializeOwned;
83use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
84use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
85use url::Url;
86
87use self::futures::SendRequest;
88use crate::{
89 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
90 Room, SessionTokens, TransmissionProgress,
91 authentication::{
92 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
93 oauth::OAuth,
94 },
95 client::thread_subscriptions::ThreadSubscriptionCatchup,
96 config::{RequestConfig, SyncToken},
97 deduplicating_handler::DeduplicatingHandler,
98 error::HttpResult,
99 event_cache::EventCache,
100 event_handler::{
101 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
102 EventHandlerStore, ObservableEventHandler, SyncEvent,
103 },
104 http_client::{HttpClient, SupportedPathBuilder},
105 latest_events::LatestEvents,
106 media::MediaError,
107 notification_settings::NotificationSettings,
108 room::RoomMember,
109 room_preview::RoomPreview,
110 send_queue::{SendQueue, SendQueueData},
111 sliding_sync::Version as SlidingSyncVersion,
112 sync::{RoomUpdate, SyncResponse},
113};
114#[cfg(feature = "e2e-encryption")]
115use crate::{
116 cross_process_lock::CrossProcessLock,
117 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
118};
119
120mod builder;
121pub(crate) mod caches;
122pub(crate) mod futures;
123pub(crate) mod thread_subscriptions;
124
125pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
126#[cfg(feature = "experimental-search")]
127use crate::search_index::SearchIndex;
128
129#[cfg(not(target_family = "wasm"))]
130type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
131#[cfg(target_family = "wasm")]
132type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
133
134#[cfg(not(target_family = "wasm"))]
135type NotificationHandlerFn =
136 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
137#[cfg(target_family = "wasm")]
138type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
139
140/// Enum controlling if a loop running callbacks should continue or abort.
141///
142/// This is mainly used in the [`sync_with_callback`] method, the return value
143/// of the provided callback controls if the sync loop should be exited.
144///
145/// [`sync_with_callback`]: #method.sync_with_callback
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub enum LoopCtrl {
148 /// Continue running the loop.
149 Continue,
150 /// Break out of the loop.
151 Break,
152}
153
154/// Represents changes that can occur to a `Client`s `Session`.
155#[derive(Debug, Clone, PartialEq)]
156pub enum SessionChange {
157 /// The session's token is no longer valid.
158 UnknownToken {
159 /// Whether or not the session was soft logged out
160 soft_logout: bool,
161 },
162 /// The session's tokens have been refreshed.
163 TokensRefreshed,
164}
165
166/// Information about the server vendor obtained from the federation API.
167#[derive(Debug, Clone, PartialEq, Eq)]
168#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
169pub struct ServerVendorInfo {
170 /// The server name.
171 pub server_name: String,
172 /// The server version.
173 pub version: String,
174}
175
176/// An async/await enabled Matrix client.
177///
178/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
179#[derive(Clone)]
180pub struct Client {
181 pub(crate) inner: Arc<ClientInner>,
182}
183
184#[derive(Default)]
185pub(crate) struct ClientLocks {
186 /// Lock ensuring that only a single room may be marked as a DM at once.
187 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
188 /// explanation.
189 pub(crate) mark_as_dm_lock: Mutex<()>,
190
191 /// Lock ensuring that only a single secret store is getting opened at the
192 /// same time.
193 ///
194 /// This is important so we don't accidentally create multiple different new
195 /// default secret storage keys.
196 #[cfg(feature = "e2e-encryption")]
197 pub(crate) open_secret_store_lock: Mutex<()>,
198
199 /// Lock ensuring that we're only storing a single secret at a time.
200 ///
201 /// Take a look at the [`SecretStore::put_secret`] method for a more
202 /// detailed explanation.
203 ///
204 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
205 #[cfg(feature = "e2e-encryption")]
206 pub(crate) store_secret_lock: Mutex<()>,
207
208 /// Lock ensuring that only one method at a time might modify our backup.
209 #[cfg(feature = "e2e-encryption")]
210 pub(crate) backup_modify_lock: Mutex<()>,
211
212 /// Lock ensuring that we're going to attempt to upload backups for a single
213 /// requester.
214 #[cfg(feature = "e2e-encryption")]
215 pub(crate) backup_upload_lock: Mutex<()>,
216
217 /// Handler making sure we only have one group session sharing request in
218 /// flight per room.
219 #[cfg(feature = "e2e-encryption")]
220 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
221
222 /// Lock making sure we're only doing one key claim request at a time.
223 #[cfg(feature = "e2e-encryption")]
224 pub(crate) key_claim_lock: Mutex<()>,
225
226 /// Handler to ensure that only one members request is running at a time,
227 /// given a room.
228 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
229
230 /// Handler to ensure that only one encryption state request is running at a
231 /// time, given a room.
232 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
233
234 /// Deduplicating handler for sending read receipts. The string is an
235 /// internal implementation detail, see [`Self::send_single_receipt`].
236 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
237
238 #[cfg(feature = "e2e-encryption")]
239 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
240
241 /// Latest "generation" of data known by the crypto store.
242 ///
243 /// This is a counter that only increments, set in the database (and can
244 /// wrap). It's incremented whenever some process acquires a lock for the
245 /// first time. *This assumes the crypto store lock is being held, to
246 /// avoid data races on writing to this value in the store*.
247 ///
248 /// The current process will maintain this value in local memory and in the
249 /// DB over time. Observing a different value than the one read in
250 /// memory, when reading from the store indicates that somebody else has
251 /// written into the database under our feet.
252 ///
253 /// TODO: this should live in the `OlmMachine`, since it's information
254 /// related to the lock. As of today (2023-07-28), we blow up the entire
255 /// olm machine when there's a generation mismatch. So storing the
256 /// generation in the olm machine would make the client think there's
257 /// *always* a mismatch, and that's why we need to store the generation
258 /// outside the `OlmMachine`.
259 #[cfg(feature = "e2e-encryption")]
260 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
261}
262
263pub(crate) struct ClientInner {
264 /// All the data related to authentication and authorization.
265 pub(crate) auth_ctx: Arc<AuthCtx>,
266
267 /// The URL of the server.
268 ///
269 /// Not to be confused with the `Self::homeserver`. `server` is usually
270 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
271 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
272 /// homeserver (at the time of writing — 2024-08-28).
273 ///
274 /// This value is optional depending on how the `Client` has been built.
275 /// If it's been built from a homeserver URL directly, we don't know the
276 /// server. However, if the `Client` has been built from a server URL or
277 /// name, then the homeserver has been discovered, and we know both.
278 server: Option<Url>,
279
280 /// The URL of the homeserver to connect to.
281 ///
282 /// This is the URL for the client-server Matrix API.
283 homeserver: StdRwLock<Url>,
284
285 /// The sliding sync version.
286 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
287
288 /// The underlying HTTP client.
289 pub(crate) http_client: HttpClient,
290
291 /// User session data.
292 pub(super) base_client: BaseClient,
293
294 /// Collection of in-memory caches for the [`Client`].
295 pub(crate) caches: ClientCaches,
296
297 /// Collection of locks individual client methods might want to use, either
298 /// to ensure that only a single call to a method happens at once or to
299 /// deduplicate multiple calls to a method.
300 pub(crate) locks: ClientLocks,
301
302 /// The cross-process store locks holder name.
303 ///
304 /// The SDK provides cross-process store locks (see
305 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
306 /// `holder_name` is the value used for all cross-process store locks
307 /// used by this `Client`.
308 ///
309 /// If multiple `Client`s are running in different processes, this
310 /// value MUST be different for each `Client`.
311 cross_process_store_locks_holder_name: String,
312
313 /// A mapping of the times at which the current user sent typing notices,
314 /// keyed by room.
315 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
316
317 /// Event handlers. See `add_event_handler`.
318 pub(crate) event_handlers: EventHandlerStore,
319
320 /// Notification handlers. See `register_notification_handler`.
321 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
322
323 /// The sender-side of channels used to receive room updates.
324 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
325
326 /// The sender-side of a channel used to observe all the room updates of a
327 /// sync response.
328 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
329
330 /// Whether the client should update its homeserver URL with the discovery
331 /// information present in the login response.
332 respect_login_well_known: bool,
333
334 /// An event that can be listened on to wait for a successful sync. The
335 /// event will only be fired if a sync loop is running. Can be used for
336 /// synchronization, e.g. if we send out a request to create a room, we can
337 /// wait for the sync to get the data to fetch a room object from the state
338 /// store.
339 pub(crate) sync_beat: event_listener::Event,
340
341 /// A central cache for events, inactive first.
342 ///
343 /// It becomes active when [`EventCache::subscribe`] is called.
344 pub(crate) event_cache: OnceCell<EventCache>,
345
346 /// End-to-end encryption related state.
347 #[cfg(feature = "e2e-encryption")]
348 pub(crate) e2ee: EncryptionData,
349
350 /// The verification state of our own device.
351 #[cfg(feature = "e2e-encryption")]
352 pub(crate) verification_state: SharedObservable<VerificationState>,
353
354 /// Whether to enable the experimental support for sending and receiving
355 /// encrypted room history on invite, per [MSC4268].
356 ///
357 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
358 #[cfg(feature = "e2e-encryption")]
359 pub(crate) enable_share_history_on_invite: bool,
360
361 /// Data related to the [`SendQueue`].
362 ///
363 /// [`SendQueue`]: crate::send_queue::SendQueue
364 pub(crate) send_queue_data: Arc<SendQueueData>,
365
366 /// The `max_upload_size` value of the homeserver, it contains the max
367 /// request size you can send.
368 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
369
370 /// The entry point to get the [`LatestEvent`] of rooms and threads.
371 ///
372 /// [`LatestEvent`]: crate::latest_event::LatestEvent
373 latest_events: OnceCell<LatestEvents>,
374
375 /// Service handling the catching up of thread subscriptions in the
376 /// background.
377 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
378
379 #[cfg(feature = "experimental-search")]
380 /// Handler for [`RoomIndex`]'s of each room
381 search_index: SearchIndex,
382}
383
384impl ClientInner {
385 /// Create a new `ClientInner`.
386 ///
387 /// All the fields passed as parameters here are those that must be cloned
388 /// upon instantiation of a sub-client, e.g. a client specialized for
389 /// notifications.
390 #[allow(clippy::too_many_arguments)]
391 async fn new(
392 auth_ctx: Arc<AuthCtx>,
393 server: Option<Url>,
394 homeserver: Url,
395 sliding_sync_version: SlidingSyncVersion,
396 http_client: HttpClient,
397 base_client: BaseClient,
398 server_info: ClientServerInfo,
399 respect_login_well_known: bool,
400 event_cache: OnceCell<EventCache>,
401 send_queue: Arc<SendQueueData>,
402 latest_events: OnceCell<LatestEvents>,
403 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
404 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
405 cross_process_store_locks_holder_name: String,
406 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
407 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
408 ) -> Arc<Self> {
409 let caches = ClientCaches {
410 server_info: server_info.into(),
411 server_metadata: Mutex::new(TtlCache::new()),
412 };
413
414 let client = Self {
415 server,
416 homeserver: StdRwLock::new(homeserver),
417 auth_ctx,
418 sliding_sync_version: StdRwLock::new(sliding_sync_version),
419 http_client,
420 base_client,
421 caches,
422 locks: Default::default(),
423 cross_process_store_locks_holder_name,
424 typing_notice_times: Default::default(),
425 event_handlers: Default::default(),
426 notification_handlers: Default::default(),
427 room_update_channels: Default::default(),
428 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
429 // ballast for all observers to catch up.
430 room_updates_sender: broadcast::Sender::new(32),
431 respect_login_well_known,
432 sync_beat: event_listener::Event::new(),
433 event_cache,
434 send_queue_data: send_queue,
435 latest_events,
436 #[cfg(feature = "e2e-encryption")]
437 e2ee: EncryptionData::new(encryption_settings),
438 #[cfg(feature = "e2e-encryption")]
439 verification_state: SharedObservable::new(VerificationState::Unknown),
440 #[cfg(feature = "e2e-encryption")]
441 enable_share_history_on_invite,
442 server_max_upload_size: Mutex::new(OnceCell::new()),
443 #[cfg(feature = "experimental-search")]
444 search_index: search_index_handler,
445 thread_subscription_catchup,
446 };
447
448 #[allow(clippy::let_and_return)]
449 let client = Arc::new(client);
450
451 #[cfg(feature = "e2e-encryption")]
452 client.e2ee.initialize_tasks(&client);
453
454 let _ = client
455 .event_cache
456 .get_or_init(|| async {
457 EventCache::new(
458 WeakClient::from_inner(&client),
459 client.base_client.event_cache_store().clone(),
460 )
461 })
462 .await;
463
464 let _ = client
465 .thread_subscription_catchup
466 .get_or_init(|| async {
467 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
468 })
469 .await;
470
471 client
472 }
473}
474
475#[cfg(not(tarpaulin_include))]
476impl Debug for Client {
477 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
478 write!(fmt, "Client")
479 }
480}
481
482impl Client {
483 /// Create a new [`Client`] that will use the given homeserver.
484 ///
485 /// # Arguments
486 ///
487 /// * `homeserver_url` - The homeserver that the client should connect to.
488 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
489 Self::builder().homeserver_url(homeserver_url).build().await
490 }
491
492 /// Returns a subscriber that publishes an event every time the ignore user
493 /// list changes.
494 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
495 self.inner.base_client.subscribe_to_ignore_user_list_changes()
496 }
497
498 /// Create a new [`ClientBuilder`].
499 pub fn builder() -> ClientBuilder {
500 ClientBuilder::new()
501 }
502
503 pub(crate) fn base_client(&self) -> &BaseClient {
504 &self.inner.base_client
505 }
506
507 /// The underlying HTTP client.
508 pub fn http_client(&self) -> &reqwest::Client {
509 &self.inner.http_client.inner
510 }
511
512 pub(crate) fn locks(&self) -> &ClientLocks {
513 &self.inner.locks
514 }
515
516 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
517 &self.inner.auth_ctx
518 }
519
520 /// The cross-process store locks holder name.
521 ///
522 /// The SDK provides cross-process store locks (see
523 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
524 /// `holder_name` is the value used for all cross-process store locks
525 /// used by this `Client`.
526 pub fn cross_process_store_locks_holder_name(&self) -> &str {
527 &self.inner.cross_process_store_locks_holder_name
528 }
529
530 /// Change the homeserver URL used by this client.
531 ///
532 /// # Arguments
533 ///
534 /// * `homeserver_url` - The new URL to use.
535 fn set_homeserver(&self, homeserver_url: Url) {
536 *self.inner.homeserver.write().unwrap() = homeserver_url;
537 }
538
539 /// Change to a different homeserver and re-resolve well-known.
540 #[cfg(feature = "e2e-encryption")]
541 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
542 &self,
543 homeserver_url: Url,
544 ) -> Result<()> {
545 self.set_homeserver(homeserver_url);
546 self.reset_server_info().await?;
547 if let ServerInfo { well_known: Some(well_known), .. } =
548 self.load_or_fetch_server_info().await?
549 {
550 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
551 }
552 Ok(())
553 }
554
555 /// Get the capabilities of the homeserver.
556 ///
557 /// This method should be used to check what features are supported by the
558 /// homeserver.
559 ///
560 /// # Examples
561 ///
562 /// ```no_run
563 /// # use matrix_sdk::Client;
564 /// # use url::Url;
565 /// # async {
566 /// # let homeserver = Url::parse("http://example.com")?;
567 /// let client = Client::new(homeserver).await?;
568 ///
569 /// let capabilities = client.get_capabilities().await?;
570 ///
571 /// if capabilities.change_password.enabled {
572 /// // Change password
573 /// }
574 /// # anyhow::Ok(()) };
575 /// ```
576 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
577 let res = self.send(get_capabilities::v3::Request::new()).await?;
578 Ok(res.capabilities)
579 }
580
581 /// Get the server vendor information from the federation API.
582 ///
583 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
584 /// both the server's software name and version.
585 ///
586 /// # Examples
587 ///
588 /// ```no_run
589 /// # use matrix_sdk::Client;
590 /// # use url::Url;
591 /// # async {
592 /// # let homeserver = Url::parse("http://example.com")?;
593 /// let client = Client::new(homeserver).await?;
594 ///
595 /// let server_info = client.server_vendor_info(None).await?;
596 /// println!(
597 /// "Server: {}, Version: {}",
598 /// server_info.server_name, server_info.version
599 /// );
600 /// # anyhow::Ok(()) };
601 /// ```
602 pub async fn server_vendor_info(
603 &self,
604 request_config: Option<RequestConfig>,
605 ) -> HttpResult<ServerVendorInfo> {
606 let res = self
607 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
608 .await?;
609
610 // Extract server info, using defaults if fields are missing.
611 let server = res.server.unwrap_or_default();
612 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
613 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
614
615 Ok(ServerVendorInfo { server_name: server_name_str, version })
616 }
617
618 /// Get a copy of the default request config.
619 ///
620 /// The default request config is what's used when sending requests if no
621 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
622 /// function with such a parameter.
623 ///
624 /// If the default request config was not customized through
625 /// [`ClientBuilder`] when creating this `Client`, the returned value will
626 /// be equivalent to [`RequestConfig::default()`].
627 pub fn request_config(&self) -> RequestConfig {
628 self.inner.http_client.request_config
629 }
630
631 /// Check whether the client has been activated.
632 ///
633 /// A client is considered active when:
634 ///
635 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
636 /// is logged in,
637 /// 2. Has loaded cached data from storage,
638 /// 3. If encryption is enabled, it also initialized or restored its
639 /// `OlmMachine`.
640 pub fn is_active(&self) -> bool {
641 self.inner.base_client.is_active()
642 }
643
644 /// The server used by the client.
645 ///
646 /// See `Self::server` to learn more.
647 pub fn server(&self) -> Option<&Url> {
648 self.inner.server.as_ref()
649 }
650
651 /// The homeserver of the client.
652 pub fn homeserver(&self) -> Url {
653 self.inner.homeserver.read().unwrap().clone()
654 }
655
656 /// Get the sliding sync version.
657 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
658 self.inner.sliding_sync_version.read().unwrap().clone()
659 }
660
661 /// Override the sliding sync version.
662 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
663 let mut lock = self.inner.sliding_sync_version.write().unwrap();
664 *lock = version;
665 }
666
667 /// Get the Matrix user session meta information.
668 ///
669 /// If the client is currently logged in, this will return a
670 /// [`SessionMeta`] object which contains the user ID and device ID.
671 /// Otherwise it returns `None`.
672 pub fn session_meta(&self) -> Option<&SessionMeta> {
673 self.base_client().session_meta()
674 }
675
676 /// Returns a receiver that gets events for each room info update. To watch
677 /// for new events, use `receiver.resubscribe()`.
678 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
679 self.base_client().room_info_notable_update_receiver()
680 }
681
682 /// Performs a search for users.
683 /// The search is performed case-insensitively on user IDs and display names
684 ///
685 /// # Arguments
686 ///
687 /// * `search_term` - The search term for the search
688 /// * `limit` - The maximum number of results to return. Defaults to 10.
689 ///
690 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
691 pub async fn search_users(
692 &self,
693 search_term: &str,
694 limit: u64,
695 ) -> HttpResult<search_users::v3::Response> {
696 let mut request = search_users::v3::Request::new(search_term.to_owned());
697
698 if let Some(limit) = UInt::new(limit) {
699 request.limit = limit;
700 }
701
702 self.send(request).await
703 }
704
705 /// Get the user id of the current owner of the client.
706 pub fn user_id(&self) -> Option<&UserId> {
707 self.session_meta().map(|s| s.user_id.as_ref())
708 }
709
710 /// Get the device ID that identifies the current session.
711 pub fn device_id(&self) -> Option<&DeviceId> {
712 self.session_meta().map(|s| s.device_id.as_ref())
713 }
714
715 /// Get the current access token for this session.
716 ///
717 /// Will be `None` if the client has not been logged in.
718 pub fn access_token(&self) -> Option<String> {
719 self.auth_ctx().access_token()
720 }
721
722 /// Get the current tokens for this session.
723 ///
724 /// To be notified of changes in the session tokens, use
725 /// [`Client::subscribe_to_session_changes()`] or
726 /// [`Client::set_session_callbacks()`].
727 ///
728 /// Returns `None` if the client has not been logged in.
729 pub fn session_tokens(&self) -> Option<SessionTokens> {
730 self.auth_ctx().session_tokens()
731 }
732
733 /// Access the authentication API used to log in this client.
734 ///
735 /// Will be `None` if the client has not been logged in.
736 pub fn auth_api(&self) -> Option<AuthApi> {
737 match self.auth_ctx().auth_data.get()? {
738 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
739 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
740 }
741 }
742
743 /// Get the whole session info of this client.
744 ///
745 /// Will be `None` if the client has not been logged in.
746 ///
747 /// Can be used with [`Client::restore_session`] to restore a previously
748 /// logged-in session.
749 pub fn session(&self) -> Option<AuthSession> {
750 match self.auth_api()? {
751 AuthApi::Matrix(api) => api.session().map(Into::into),
752 AuthApi::OAuth(api) => api.full_session().map(Into::into),
753 }
754 }
755
756 /// Get a reference to the state store.
757 pub fn state_store(&self) -> &DynStateStore {
758 self.base_client().state_store()
759 }
760
761 /// Get a reference to the event cache store.
762 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
763 self.base_client().event_cache_store()
764 }
765
766 /// Get a reference to the media store.
767 pub fn media_store(&self) -> &MediaStoreLock {
768 self.base_client().media_store()
769 }
770
771 /// Access the native Matrix authentication API with this client.
772 pub fn matrix_auth(&self) -> MatrixAuth {
773 MatrixAuth::new(self.clone())
774 }
775
776 /// Get the account of the current owner of the client.
777 pub fn account(&self) -> Account {
778 Account::new(self.clone())
779 }
780
781 /// Get the encryption manager of the client.
782 #[cfg(feature = "e2e-encryption")]
783 pub fn encryption(&self) -> Encryption {
784 Encryption::new(self.clone())
785 }
786
787 /// Get the media manager of the client.
788 pub fn media(&self) -> Media {
789 Media::new(self.clone())
790 }
791
792 /// Get the pusher manager of the client.
793 pub fn pusher(&self) -> Pusher {
794 Pusher::new(self.clone())
795 }
796
797 /// Access the OAuth 2.0 API of the client.
798 pub fn oauth(&self) -> OAuth {
799 OAuth::new(self.clone())
800 }
801
802 /// Register a handler for a specific event type.
803 ///
804 /// The handler is a function or closure with one or more arguments. The
805 /// first argument is the event itself. All additional arguments are
806 /// "context" arguments: They have to implement [`EventHandlerContext`].
807 /// This trait is named that way because most of the types implementing it
808 /// give additional context about an event: The room it was in, its raw form
809 /// and other similar things. As two exceptions to this,
810 /// [`Client`] and [`EventHandlerHandle`] also implement the
811 /// `EventHandlerContext` trait so you don't have to clone your client
812 /// into the event handler manually and a handler can decide to remove
813 /// itself.
814 ///
815 /// Some context arguments are not universally applicable. A context
816 /// argument that isn't available for the given event type will result in
817 /// the event handler being skipped and an error being logged. The following
818 /// context argument types are only available for a subset of event types:
819 ///
820 /// * [`Room`] is only available for room-specific events, i.e. not for
821 /// events like global account data events or presence events.
822 ///
823 /// You can provide custom context via
824 /// [`add_event_handler_context`](Client::add_event_handler_context) and
825 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
826 /// into the event handler.
827 ///
828 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
829 ///
830 /// # Examples
831 ///
832 /// ```no_run
833 /// use matrix_sdk::{
834 /// deserialized_responses::EncryptionInfo,
835 /// event_handler::Ctx,
836 /// ruma::{
837 /// events::{
838 /// macros::EventContent,
839 /// push_rules::PushRulesEvent,
840 /// room::{
841 /// message::SyncRoomMessageEvent,
842 /// topic::SyncRoomTopicEvent,
843 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
844 /// },
845 /// },
846 /// push::Action,
847 /// Int, MilliSecondsSinceUnixEpoch,
848 /// },
849 /// Client, Room,
850 /// };
851 /// use serde::{Deserialize, Serialize};
852 ///
853 /// # async fn example(client: Client) {
854 /// client.add_event_handler(
855 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
856 /// // Common usage: Room event plus room and client.
857 /// },
858 /// );
859 /// client.add_event_handler(
860 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
861 /// async move {
862 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
863 /// // unencrypted events and events that were decrypted by the SDK.
864 /// }
865 /// },
866 /// );
867 /// client.add_event_handler(
868 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
869 /// async move {
870 /// // A `Vec<Action>` parameter allows you to know which push actions
871 /// // are applicable for an event. For example, an event with
872 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
873 /// // in the timeline.
874 /// }
875 /// },
876 /// );
877 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
878 /// // You can omit any or all arguments after the first.
879 /// });
880 ///
881 /// // Registering a temporary event handler:
882 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
883 /// /* Event handler */
884 /// });
885 /// client.remove_event_handler(handle);
886 ///
887 /// // Registering custom event handler context:
888 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
889 /// struct MyContext {
890 /// number: usize,
891 /// }
892 /// client.add_event_handler_context(MyContext { number: 5 });
893 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
894 /// // Use the context
895 /// });
896 ///
897 /// // This will handle membership events in joined rooms. Invites are special, see below.
898 /// client.add_event_handler(
899 /// |ev: SyncRoomMemberEvent| async move {},
900 /// );
901 ///
902 /// // To handle state events in invited rooms (including invite membership events),
903 /// // `StrippedRoomMemberEvent` should be used.
904 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
905 /// client.add_event_handler(
906 /// |ev: StrippedRoomMemberEvent| async move {},
907 /// );
908 ///
909 /// // Custom events work exactly the same way, you just need to declare
910 /// // the content struct and use the EventContent derive macro on it.
911 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
912 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
913 /// struct TokenEventContent {
914 /// token: String,
915 /// #[serde(rename = "exp")]
916 /// expires_at: MilliSecondsSinceUnixEpoch,
917 /// }
918 ///
919 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
920 /// todo!("Display the token");
921 /// });
922 ///
923 /// // Event handler closures can also capture local variables.
924 /// // Make sure they are cheap to clone though, because they will be cloned
925 /// // every time the closure is called.
926 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
927 ///
928 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
929 /// println!("Calling the handler with identifier {data}");
930 /// });
931 /// # }
932 /// ```
933 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
934 where
935 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
936 H: EventHandler<Ev, Ctx>,
937 {
938 self.add_event_handler_impl(handler, None)
939 }
940
941 /// Register a handler for a specific room, and event type.
942 ///
943 /// This method works the same way as
944 /// [`add_event_handler`][Self::add_event_handler], except that the handler
945 /// will only be called for events in the room with the specified ID. See
946 /// that method for more details on event handler functions.
947 ///
948 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
949 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
950 /// your use case.
951 pub fn add_room_event_handler<Ev, Ctx, H>(
952 &self,
953 room_id: &RoomId,
954 handler: H,
955 ) -> EventHandlerHandle
956 where
957 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
958 H: EventHandler<Ev, Ctx>,
959 {
960 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
961 }
962
963 /// Observe a specific event type.
964 ///
965 /// `Ev` represents the kind of event that will be observed. `Ctx`
966 /// represents the context that will come with the event. It relies on the
967 /// same mechanism as [`Client::add_event_handler`]. The main difference is
968 /// that it returns an [`ObservableEventHandler`] and doesn't require a
969 /// user-defined closure. It is possible to subscribe to the
970 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
971 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
972 /// Ctx)`.
973 ///
974 /// Be careful that only the most recent value can be observed. Subscribers
975 /// are notified when a new value is sent, but there is no guarantee
976 /// that they will see all values.
977 ///
978 /// # Example
979 ///
980 /// Let's see a classical usage:
981 ///
982 /// ```
983 /// use futures_util::StreamExt as _;
984 /// use matrix_sdk::{
985 /// Client, Room,
986 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
987 /// };
988 ///
989 /// # async fn example(client: Client) -> Option<()> {
990 /// let observer =
991 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
992 ///
993 /// let mut subscriber = observer.subscribe();
994 ///
995 /// let (event, (room, push_actions)) = subscriber.next().await?;
996 /// # Some(())
997 /// # }
998 /// ```
999 ///
1000 /// Now let's see how to get several contexts that can be useful for you:
1001 ///
1002 /// ```
1003 /// use matrix_sdk::{
1004 /// Client, Room,
1005 /// deserialized_responses::EncryptionInfo,
1006 /// ruma::{
1007 /// events::room::{
1008 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1009 /// },
1010 /// push::Action,
1011 /// },
1012 /// };
1013 ///
1014 /// # async fn example(client: Client) {
1015 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1016 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1017 ///
1018 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1019 /// // to distinguish between unencrypted events and events that were decrypted
1020 /// // by the SDK.
1021 /// let _ = client
1022 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1023 /// );
1024 ///
1025 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1026 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1027 /// // should be highlighted in the timeline.
1028 /// let _ =
1029 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1030 ///
1031 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1032 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1033 /// # }
1034 /// ```
1035 ///
1036 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1037 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1038 where
1039 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1040 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1041 {
1042 self.observe_room_events_impl(None)
1043 }
1044
1045 /// Observe a specific room, and event type.
1046 ///
1047 /// This method works the same way as [`Client::observe_events`], except
1048 /// that the observability will only be applied for events in the room with
1049 /// the specified ID. See that method for more details.
1050 ///
1051 /// Be careful that only the most recent value can be observed. Subscribers
1052 /// are notified when a new value is sent, but there is no guarantee
1053 /// that they will see all values.
1054 pub fn observe_room_events<Ev, Ctx>(
1055 &self,
1056 room_id: &RoomId,
1057 ) -> ObservableEventHandler<(Ev, Ctx)>
1058 where
1059 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1060 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1061 {
1062 self.observe_room_events_impl(Some(room_id.to_owned()))
1063 }
1064
1065 /// Shared implementation for `Client::observe_events` and
1066 /// `Client::observe_room_events`.
1067 fn observe_room_events_impl<Ev, Ctx>(
1068 &self,
1069 room_id: Option<OwnedRoomId>,
1070 ) -> ObservableEventHandler<(Ev, Ctx)>
1071 where
1072 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1073 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1074 {
1075 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1076 // new value.
1077 let shared_observable = SharedObservable::new(None);
1078
1079 ObservableEventHandler::new(
1080 shared_observable.clone(),
1081 self.event_handler_drop_guard(self.add_event_handler_impl(
1082 move |event: Ev, context: Ctx| {
1083 shared_observable.set(Some((event, context)));
1084
1085 ready(())
1086 },
1087 room_id,
1088 )),
1089 )
1090 }
1091
1092 /// Remove the event handler associated with the handle.
1093 ///
1094 /// Note that you **must not** call `remove_event_handler` from the
1095 /// non-async part of an event handler, that is:
1096 ///
1097 /// ```ignore
1098 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1099 /// // ⚠ this will cause a deadlock ⚠
1100 /// client.remove_event_handler(handle);
1101 ///
1102 /// async move {
1103 /// // removing the event handler here is fine
1104 /// client.remove_event_handler(handle);
1105 /// }
1106 /// })
1107 /// ```
1108 ///
1109 /// Note also that handlers that remove themselves will still execute with
1110 /// events received in the same sync cycle.
1111 ///
1112 /// # Arguments
1113 ///
1114 /// `handle` - The [`EventHandlerHandle`] that is returned when
1115 /// registering the event handler with [`Client::add_event_handler`].
1116 ///
1117 /// # Examples
1118 ///
1119 /// ```no_run
1120 /// # use url::Url;
1121 /// # use tokio::sync::mpsc;
1122 /// #
1123 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1124 /// #
1125 /// use matrix_sdk::{
1126 /// Client, event_handler::EventHandlerHandle,
1127 /// ruma::events::room::member::SyncRoomMemberEvent,
1128 /// };
1129 /// #
1130 /// # futures_executor::block_on(async {
1131 /// # let client = matrix_sdk::Client::builder()
1132 /// # .homeserver_url(homeserver)
1133 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1134 /// # .build()
1135 /// # .await
1136 /// # .unwrap();
1137 ///
1138 /// client.add_event_handler(
1139 /// |ev: SyncRoomMemberEvent,
1140 /// client: Client,
1141 /// handle: EventHandlerHandle| async move {
1142 /// // Common usage: Check arriving Event is the expected one
1143 /// println!("Expected RoomMemberEvent received!");
1144 /// client.remove_event_handler(handle);
1145 /// },
1146 /// );
1147 /// # });
1148 /// ```
1149 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1150 self.inner.event_handlers.remove(handle);
1151 }
1152
1153 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1154 /// the given handle.
1155 ///
1156 /// When the returned value is dropped, the event handler will be removed.
1157 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1158 EventHandlerDropGuard::new(handle, self.clone())
1159 }
1160
1161 /// Add an arbitrary value for use as event handler context.
1162 ///
1163 /// The value can be obtained in an event handler by adding an argument of
1164 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1165 ///
1166 /// If a value of the same type has been added before, it will be
1167 /// overwritten.
1168 ///
1169 /// # Examples
1170 ///
1171 /// ```no_run
1172 /// use matrix_sdk::{
1173 /// Room, event_handler::Ctx,
1174 /// ruma::events::room::message::SyncRoomMessageEvent,
1175 /// };
1176 /// # #[derive(Clone)]
1177 /// # struct SomeType;
1178 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1179 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1180 /// # futures_executor::block_on(async {
1181 /// # let client = matrix_sdk::Client::builder()
1182 /// # .homeserver_url(homeserver)
1183 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1184 /// # .build()
1185 /// # .await
1186 /// # .unwrap();
1187 ///
1188 /// // Handle used to send messages to the UI part of the app
1189 /// let my_gui_handle: SomeType = obtain_gui_handle();
1190 ///
1191 /// client.add_event_handler_context(my_gui_handle.clone());
1192 /// client.add_event_handler(
1193 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1194 /// async move {
1195 /// // gui_handle.send(DisplayMessage { message: ev });
1196 /// }
1197 /// },
1198 /// );
1199 /// # });
1200 /// ```
1201 pub fn add_event_handler_context<T>(&self, ctx: T)
1202 where
1203 T: Clone + Send + Sync + 'static,
1204 {
1205 self.inner.event_handlers.add_context(ctx);
1206 }
1207
1208 /// Register a handler for a notification.
1209 ///
1210 /// Similar to [`Client::add_event_handler`], but only allows functions
1211 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1212 /// [`Client`] for now.
1213 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1214 where
1215 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1216 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1217 {
1218 self.inner.notification_handlers.write().await.push(Box::new(
1219 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1220 ));
1221
1222 self
1223 }
1224
1225 /// Subscribe to all updates for the room with the given ID.
1226 ///
1227 /// The returned receiver will receive a new message for each sync response
1228 /// that contains updates for that room.
1229 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1230 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1231 btree_map::Entry::Vacant(entry) => {
1232 let (tx, rx) = broadcast::channel(8);
1233 entry.insert(tx);
1234 rx
1235 }
1236 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1237 }
1238 }
1239
1240 /// Subscribe to all updates to all rooms, whenever any has been received in
1241 /// a sync response.
1242 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1243 self.inner.room_updates_sender.subscribe()
1244 }
1245
1246 pub(crate) async fn notification_handlers(
1247 &self,
1248 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1249 self.inner.notification_handlers.read().await
1250 }
1251
1252 /// Get all the rooms the client knows about.
1253 ///
1254 /// This will return the list of joined, invited, and left rooms.
1255 pub fn rooms(&self) -> Vec<Room> {
1256 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1257 }
1258
1259 /// Get all the rooms the client knows about, filtered by room state.
1260 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1261 self.base_client()
1262 .rooms_filtered(filter)
1263 .into_iter()
1264 .map(|room| Room::new(self.clone(), room))
1265 .collect()
1266 }
1267
1268 /// Get a stream of all the rooms, in addition to the existing rooms.
1269 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1270 let (rooms, stream) = self.base_client().rooms_stream();
1271
1272 let map_room = |room| Room::new(self.clone(), room);
1273
1274 (
1275 rooms.into_iter().map(map_room).collect(),
1276 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1277 )
1278 }
1279
1280 /// Returns the joined rooms this client knows about.
1281 pub fn joined_rooms(&self) -> Vec<Room> {
1282 self.rooms_filtered(RoomStateFilter::JOINED)
1283 }
1284
1285 /// Returns the invited rooms this client knows about.
1286 pub fn invited_rooms(&self) -> Vec<Room> {
1287 self.rooms_filtered(RoomStateFilter::INVITED)
1288 }
1289
1290 /// Returns the left rooms this client knows about.
1291 pub fn left_rooms(&self) -> Vec<Room> {
1292 self.rooms_filtered(RoomStateFilter::LEFT)
1293 }
1294
1295 /// Returns the joined space rooms this client knows about.
1296 pub fn joined_space_rooms(&self) -> Vec<Room> {
1297 self.base_client()
1298 .rooms_filtered(RoomStateFilter::JOINED)
1299 .into_iter()
1300 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1301 .collect()
1302 }
1303
1304 /// Get a room with the given room id.
1305 ///
1306 /// # Arguments
1307 ///
1308 /// `room_id` - The unique id of the room that should be fetched.
1309 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1310 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1311 }
1312
1313 /// Gets the preview of a room, whether the current user has joined it or
1314 /// not.
1315 pub async fn get_room_preview(
1316 &self,
1317 room_or_alias_id: &RoomOrAliasId,
1318 via: Vec<OwnedServerName>,
1319 ) -> Result<RoomPreview> {
1320 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1321 Ok(room_id) => room_id.to_owned(),
1322 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1323 };
1324
1325 if let Some(room) = self.get_room(&room_id) {
1326 // The cached data can only be trusted if the room state is joined or
1327 // banned: for invite and knock rooms, no updates will be received
1328 // for the rooms after the invite/knock action took place so we may
1329 // have very out to date data for important fields such as
1330 // `join_rule`. For left rooms, the homeserver should return the latest info.
1331 match room.state() {
1332 RoomState::Joined | RoomState::Banned => {
1333 return Ok(RoomPreview::from_known_room(&room).await);
1334 }
1335 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1336 }
1337 }
1338
1339 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1340 }
1341
1342 /// Resolve a room alias to a room id and a list of servers which know
1343 /// about it.
1344 ///
1345 /// # Arguments
1346 ///
1347 /// `room_alias` - The room alias to be resolved.
1348 pub async fn resolve_room_alias(
1349 &self,
1350 room_alias: &RoomAliasId,
1351 ) -> HttpResult<get_alias::v3::Response> {
1352 let request = get_alias::v3::Request::new(room_alias.to_owned());
1353 self.send(request).await
1354 }
1355
1356 /// Checks if a room alias is not in use yet.
1357 ///
1358 /// Returns:
1359 /// - `Ok(true)` if the room alias is available.
1360 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1361 /// status code).
1362 /// - An `Err` otherwise.
1363 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1364 match self.resolve_room_alias(alias).await {
1365 // The room alias was resolved, so it's already in use.
1366 Ok(_) => Ok(false),
1367 Err(error) => {
1368 match error.client_api_error_kind() {
1369 // The room alias wasn't found, so it's available.
1370 Some(ErrorKind::NotFound) => Ok(true),
1371 _ => Err(error),
1372 }
1373 }
1374 }
1375 }
1376
1377 /// Adds a new room alias associated with a room to the room directory.
1378 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1379 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1380 self.send(request).await?;
1381 Ok(())
1382 }
1383
1384 /// Removes a room alias from the room directory.
1385 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1386 let request = delete_alias::v3::Request::new(alias.to_owned());
1387 self.send(request).await?;
1388 Ok(())
1389 }
1390
1391 /// Update the homeserver from the login response well-known if needed.
1392 ///
1393 /// # Arguments
1394 ///
1395 /// * `login_well_known` - The `well_known` field from a successful login
1396 /// response.
1397 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1398 if self.inner.respect_login_well_known
1399 && let Some(well_known) = login_well_known
1400 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1401 {
1402 self.set_homeserver(homeserver);
1403 }
1404 }
1405
1406 /// Similar to [`Client::restore_session_with`], with
1407 /// [`RoomLoadSettings::default()`].
1408 ///
1409 /// # Panics
1410 ///
1411 /// Panics if a session was already restored or logged in.
1412 #[instrument(skip_all)]
1413 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1414 self.restore_session_with(session, RoomLoadSettings::default()).await
1415 }
1416
1417 /// Restore a session previously logged-in using one of the available
1418 /// authentication APIs. The number of rooms to restore is controlled by
1419 /// [`RoomLoadSettings`].
1420 ///
1421 /// See the documentation of the corresponding authentication API's
1422 /// `restore_session` method for more information.
1423 ///
1424 /// # Panics
1425 ///
1426 /// Panics if a session was already restored or logged in.
1427 #[instrument(skip_all)]
1428 pub async fn restore_session_with(
1429 &self,
1430 session: impl Into<AuthSession>,
1431 room_load_settings: RoomLoadSettings,
1432 ) -> Result<()> {
1433 let session = session.into();
1434 match session {
1435 AuthSession::Matrix(session) => {
1436 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1437 }
1438 AuthSession::OAuth(session) => {
1439 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1440 }
1441 }
1442 }
1443
1444 /// Refresh the access token using the authentication API used to log into
1445 /// this session.
1446 ///
1447 /// See the documentation of the authentication API's `refresh_access_token`
1448 /// method for more information.
1449 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1450 let Some(auth_api) = self.auth_api() else {
1451 return Err(RefreshTokenError::RefreshTokenRequired);
1452 };
1453
1454 match auth_api {
1455 AuthApi::Matrix(api) => {
1456 trace!("Token refresh: Using the homeserver.");
1457 Box::pin(api.refresh_access_token()).await?;
1458 }
1459 AuthApi::OAuth(api) => {
1460 trace!("Token refresh: Using OAuth 2.0.");
1461 Box::pin(api.refresh_access_token()).await?;
1462 }
1463 }
1464
1465 Ok(())
1466 }
1467
1468 /// Log out the current session using the proper authentication API.
1469 ///
1470 /// # Errors
1471 ///
1472 /// Returns an error if the session is not authenticated or if an error
1473 /// occurred while making the request to the server.
1474 pub async fn logout(&self) -> Result<(), Error> {
1475 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1476 match auth_api {
1477 AuthApi::Matrix(matrix_auth) => {
1478 matrix_auth.logout().await?;
1479 Ok(())
1480 }
1481 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1482 }
1483 }
1484
1485 /// Get or upload a sync filter.
1486 ///
1487 /// This method will either get a filter ID from the store or upload the
1488 /// filter definition to the homeserver and return the new filter ID.
1489 ///
1490 /// # Arguments
1491 ///
1492 /// * `filter_name` - The unique name of the filter, this name will be used
1493 /// locally to store and identify the filter ID returned by the server.
1494 ///
1495 /// * `definition` - The filter definition that should be uploaded to the
1496 /// server if no filter ID can be found in the store.
1497 ///
1498 /// # Examples
1499 ///
1500 /// ```no_run
1501 /// # use matrix_sdk::{
1502 /// # Client, config::SyncSettings,
1503 /// # ruma::api::client::{
1504 /// # filter::{
1505 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1506 /// # },
1507 /// # sync::sync_events::v3::Filter,
1508 /// # }
1509 /// # };
1510 /// # use url::Url;
1511 /// # async {
1512 /// # let homeserver = Url::parse("http://example.com").unwrap();
1513 /// # let client = Client::new(homeserver).await.unwrap();
1514 /// let mut filter = FilterDefinition::default();
1515 ///
1516 /// // Let's enable member lazy loading.
1517 /// filter.room.state.lazy_load_options =
1518 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1519 ///
1520 /// let filter_id = client
1521 /// .get_or_upload_filter("sync", filter)
1522 /// .await
1523 /// .unwrap();
1524 ///
1525 /// let sync_settings = SyncSettings::new()
1526 /// .filter(Filter::FilterId(filter_id));
1527 ///
1528 /// let response = client.sync_once(sync_settings).await.unwrap();
1529 /// # };
1530 #[instrument(skip(self, definition))]
1531 pub async fn get_or_upload_filter(
1532 &self,
1533 filter_name: &str,
1534 definition: FilterDefinition,
1535 ) -> Result<String> {
1536 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1537 debug!("Found filter locally");
1538 Ok(filter)
1539 } else {
1540 debug!("Didn't find filter locally");
1541 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1542 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1543 let response = self.send(request).await?;
1544
1545 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1546
1547 Ok(response.filter_id)
1548 }
1549 }
1550
1551 /// Prepare to join a room by ID, by getting the current details about it
1552 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1553 let room = self.get_room(room_id)?;
1554
1555 let inviter = match room.invite_details().await {
1556 Ok(details) => details.inviter,
1557 Err(Error::WrongRoomState(_)) => None,
1558 Err(e) => {
1559 warn!("Error fetching invite details for room: {e:?}");
1560 None
1561 }
1562 };
1563
1564 Some(PreJoinRoomInfo { inviter })
1565 }
1566
1567 /// Finish joining a room.
1568 ///
1569 /// If the room was an invite that should be marked as a DM, will include it
1570 /// in the DM event after creating the joined room.
1571 ///
1572 /// If encrypted history sharing is enabled, will check to see if we have a
1573 /// key bundle, and import it if so.
1574 ///
1575 /// # Arguments
1576 ///
1577 /// * `room_id` - The `RoomId` of the room that was joined.
1578 /// * `pre_join_room_info` - Information about the room before we joined.
1579 async fn finish_join_room(
1580 &self,
1581 room_id: &RoomId,
1582 pre_join_room_info: Option<PreJoinRoomInfo>,
1583 ) -> Result<Room> {
1584 info!(?room_id, ?pre_join_room_info, "Completing room join");
1585 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1586 room.state() == RoomState::Invited
1587 && room.is_direct().await.unwrap_or_else(|e| {
1588 warn!(%room_id, "is_direct() failed: {e}");
1589 false
1590 })
1591 } else {
1592 false
1593 };
1594
1595 let base_room = self
1596 .base_client()
1597 .room_joined(
1598 room_id,
1599 pre_join_room_info
1600 .as_ref()
1601 .and_then(|info| info.inviter.as_ref())
1602 .map(|i| i.user_id().to_owned()),
1603 )
1604 .await?;
1605 let room = Room::new(self.clone(), base_room);
1606
1607 if mark_as_dm {
1608 room.set_is_direct(true).await?;
1609 }
1610
1611 #[cfg(feature = "e2e-encryption")]
1612 if self.inner.enable_share_history_on_invite
1613 && let Some(inviter) =
1614 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1615 {
1616 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1617 .await?;
1618 }
1619
1620 // Suppress "unused variable" and "unused field" lints
1621 #[cfg(not(feature = "e2e-encryption"))]
1622 let _ = pre_join_room_info.map(|i| i.inviter);
1623
1624 Ok(room)
1625 }
1626
1627 /// Join a room by `RoomId`.
1628 ///
1629 /// Returns the `Room` in the joined state.
1630 ///
1631 /// # Arguments
1632 ///
1633 /// * `room_id` - The `RoomId` of the room to be joined.
1634 #[instrument(skip(self))]
1635 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1636 // See who invited us to this room, if anyone. Note we have to do this before
1637 // making the `/join` request, otherwise we could race against the sync.
1638 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1639
1640 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1641 let response = self.send(request).await?;
1642 self.finish_join_room(&response.room_id, pre_join_info).await
1643 }
1644
1645 /// Join a room by `RoomOrAliasId`.
1646 ///
1647 /// Returns the `Room` in the joined state.
1648 ///
1649 /// # Arguments
1650 ///
1651 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1652 /// alias looks like `#name:example.com`.
1653 /// * `server_names` - The server names to be used for resolving the alias,
1654 /// if needs be.
1655 #[instrument(skip(self))]
1656 pub async fn join_room_by_id_or_alias(
1657 &self,
1658 alias: &RoomOrAliasId,
1659 server_names: &[OwnedServerName],
1660 ) -> Result<Room> {
1661 let pre_join_info = {
1662 match alias.try_into() {
1663 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1664 Err(_) => {
1665 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1666 // responding to an invitation to the room, and therefore don't need to handle
1667 // things that happen as a result of invites.
1668 None
1669 }
1670 }
1671 };
1672 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1673 via: server_names.to_owned(),
1674 });
1675 let response = self.send(request).await?;
1676 self.finish_join_room(&response.room_id, pre_join_info).await
1677 }
1678
1679 /// Search the homeserver's directory of public rooms.
1680 ///
1681 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1682 /// a `get_public_rooms::Response`.
1683 ///
1684 /// # Arguments
1685 ///
1686 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1687 ///
1688 /// * `since` - Pagination token from a previous request.
1689 ///
1690 /// * `server` - The name of the server, if `None` the requested server is
1691 /// used.
1692 ///
1693 /// # Examples
1694 /// ```no_run
1695 /// use matrix_sdk::Client;
1696 /// # use url::Url;
1697 /// # let homeserver = Url::parse("http://example.com").unwrap();
1698 /// # let limit = Some(10);
1699 /// # let since = Some("since token");
1700 /// # let server = Some("servername.com".try_into().unwrap());
1701 /// # async {
1702 /// let mut client = Client::new(homeserver).await.unwrap();
1703 ///
1704 /// client.public_rooms(limit, since, server).await;
1705 /// # };
1706 /// ```
1707 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1708 pub async fn public_rooms(
1709 &self,
1710 limit: Option<u32>,
1711 since: Option<&str>,
1712 server: Option<&ServerName>,
1713 ) -> HttpResult<get_public_rooms::v3::Response> {
1714 let limit = limit.map(UInt::from);
1715
1716 let request = assign!(get_public_rooms::v3::Request::new(), {
1717 limit,
1718 since: since.map(ToOwned::to_owned),
1719 server: server.map(ToOwned::to_owned),
1720 });
1721 self.send(request).await
1722 }
1723
1724 /// Create a room with the given parameters.
1725 ///
1726 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1727 /// created room.
1728 ///
1729 /// If you want to create a direct message with one specific user, you can
1730 /// use [`create_dm`][Self::create_dm], which is more convenient than
1731 /// assembling the [`create_room::v3::Request`] yourself.
1732 ///
1733 /// If the `is_direct` field of the request is set to `true` and at least
1734 /// one user is invited, the room will be automatically added to the direct
1735 /// rooms in the account data.
1736 ///
1737 /// # Examples
1738 ///
1739 /// ```no_run
1740 /// use matrix_sdk::{
1741 /// Client,
1742 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1743 /// };
1744 /// # use url::Url;
1745 /// #
1746 /// # async {
1747 /// # let homeserver = Url::parse("http://example.com").unwrap();
1748 /// let request = CreateRoomRequest::new();
1749 /// let client = Client::new(homeserver).await.unwrap();
1750 /// assert!(client.create_room(request).await.is_ok());
1751 /// # };
1752 /// ```
1753 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1754 let invite = request.invite.clone();
1755 let is_direct_room = request.is_direct;
1756 let response = self.send(request).await?;
1757
1758 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1759
1760 let joined_room = Room::new(self.clone(), base_room);
1761
1762 if is_direct_room
1763 && !invite.is_empty()
1764 && let Err(error) =
1765 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1766 {
1767 // FIXME: Retry in the background
1768 error!("Failed to mark room as DM: {error}");
1769 }
1770
1771 Ok(joined_room)
1772 }
1773
1774 /// Create a DM room.
1775 ///
1776 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1777 /// given user being invited, the room marked `is_direct` and both the
1778 /// creator and invitee getting the default maximum power level.
1779 ///
1780 /// If the `e2e-encryption` feature is enabled, the room will also be
1781 /// encrypted.
1782 ///
1783 /// # Arguments
1784 ///
1785 /// * `user_id` - The ID of the user to create a DM for.
1786 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1787 #[cfg(feature = "e2e-encryption")]
1788 let initial_state = vec![
1789 InitialStateEvent::with_empty_state_key(
1790 RoomEncryptionEventContent::with_recommended_defaults(),
1791 )
1792 .to_raw_any(),
1793 ];
1794
1795 #[cfg(not(feature = "e2e-encryption"))]
1796 let initial_state = vec![];
1797
1798 let request = assign!(create_room::v3::Request::new(), {
1799 invite: vec![user_id.to_owned()],
1800 is_direct: true,
1801 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1802 initial_state,
1803 });
1804
1805 self.create_room(request).await
1806 }
1807
1808 /// Get the existing DM room with the given user, if any.
1809 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1810 let rooms = self.joined_rooms();
1811
1812 // Find the room we share with the `user_id` and only with `user_id`
1813 let room = rooms.into_iter().find(|r| {
1814 let targets = r.direct_targets();
1815 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1816 });
1817
1818 trace!(?user_id, ?room, "Found DM room with user");
1819 room
1820 }
1821
1822 /// Search the homeserver's directory for public rooms with a filter.
1823 ///
1824 /// # Arguments
1825 ///
1826 /// * `room_search` - The easiest way to create this request is using the
1827 /// `get_public_rooms_filtered::Request` itself.
1828 ///
1829 /// # Examples
1830 ///
1831 /// ```no_run
1832 /// # use url::Url;
1833 /// # use matrix_sdk::Client;
1834 /// # async {
1835 /// # let homeserver = Url::parse("http://example.com")?;
1836 /// use matrix_sdk::ruma::{
1837 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1838 /// };
1839 /// # let mut client = Client::new(homeserver).await?;
1840 ///
1841 /// let mut filter = Filter::new();
1842 /// filter.generic_search_term = Some("rust".to_owned());
1843 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1844 /// request.filter = filter;
1845 ///
1846 /// let response = client.public_rooms_filtered(request).await?;
1847 ///
1848 /// for room in response.chunk {
1849 /// println!("Found room {room:?}");
1850 /// }
1851 /// # anyhow::Ok(()) };
1852 /// ```
1853 pub async fn public_rooms_filtered(
1854 &self,
1855 request: get_public_rooms_filtered::v3::Request,
1856 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1857 self.send(request).await
1858 }
1859
1860 /// Send an arbitrary request to the server, without updating client state.
1861 ///
1862 /// **Warning:** Because this method *does not* update the client state, it
1863 /// is important to make sure that you account for this yourself, and
1864 /// use wrapper methods where available. This method should *only* be
1865 /// used if a wrapper method for the endpoint you'd like to use is not
1866 /// available.
1867 ///
1868 /// # Arguments
1869 ///
1870 /// * `request` - A filled out and valid request for the endpoint to be hit
1871 ///
1872 /// * `timeout` - An optional request timeout setting, this overrides the
1873 /// default request setting if one was set.
1874 ///
1875 /// # Examples
1876 ///
1877 /// ```no_run
1878 /// # use matrix_sdk::{Client, config::SyncSettings};
1879 /// # use url::Url;
1880 /// # async {
1881 /// # let homeserver = Url::parse("http://localhost:8080")?;
1882 /// # let mut client = Client::new(homeserver).await?;
1883 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1884 ///
1885 /// // First construct the request you want to make
1886 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1887 /// // for all available Endpoints
1888 /// let user_id = user_id!("@example:localhost").to_owned();
1889 /// let request = profile::get_profile::v3::Request::new(user_id);
1890 ///
1891 /// // Start the request using Client::send()
1892 /// let response = client.send(request).await?;
1893 ///
1894 /// // Check the corresponding Response struct to find out what types are
1895 /// // returned
1896 /// # anyhow::Ok(()) };
1897 /// ```
1898 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1899 where
1900 Request: OutgoingRequest + Clone + Debug,
1901 for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1902 Request::PathBuilder: SupportedPathBuilder,
1903 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1904 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1905 {
1906 SendRequest {
1907 client: self.clone(),
1908 request,
1909 config: None,
1910 send_progress: Default::default(),
1911 }
1912 }
1913
1914 pub(crate) async fn send_inner<Request>(
1915 &self,
1916 request: Request,
1917 config: Option<RequestConfig>,
1918 send_progress: SharedObservable<TransmissionProgress>,
1919 ) -> HttpResult<Request::IncomingResponse>
1920 where
1921 Request: OutgoingRequest + Debug,
1922 for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1923 Request::PathBuilder: SupportedPathBuilder,
1924 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1925 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1926 {
1927 let homeserver = self.homeserver().to_string();
1928 let access_token = self.access_token();
1929 let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1930
1931 let path_builder_input =
1932 Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1933
1934 self.inner
1935 .http_client
1936 .send(
1937 request,
1938 config,
1939 homeserver,
1940 access_token.as_deref(),
1941 path_builder_input,
1942 send_progress,
1943 )
1944 .await
1945 }
1946
1947 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1948 _ = self
1949 .inner
1950 .auth_ctx
1951 .session_change_sender
1952 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1953 }
1954
1955 /// Fetches server versions from network; no caching.
1956 pub async fn fetch_server_versions(
1957 &self,
1958 request_config: Option<RequestConfig>,
1959 ) -> HttpResult<get_supported_versions::Response> {
1960 let server_versions = self
1961 .send_inner(get_supported_versions::Request::new(), request_config, Default::default())
1962 .await?;
1963
1964 Ok(server_versions)
1965 }
1966
1967 /// Fetches client well_known from network; no caching.
1968 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1969 let server_url_string = self
1970 .server()
1971 .unwrap_or(
1972 // Sometimes people configure their well-known directly on the homeserver so use
1973 // this as a fallback when the server name is unknown.
1974 &self.homeserver(),
1975 )
1976 .to_string();
1977
1978 let well_known = self
1979 .inner
1980 .http_client
1981 .send(
1982 discover_homeserver::Request::new(),
1983 Some(RequestConfig::short_retry()),
1984 server_url_string,
1985 None,
1986 (),
1987 Default::default(),
1988 )
1989 .await;
1990
1991 match well_known {
1992 Ok(well_known) => Some(well_known),
1993 Err(http_error) => {
1994 // It is perfectly valid to not have a well-known file.
1995 // Maybe we should check for a specific error code to be sure?
1996 warn!("Failed to fetch client well-known: {http_error}");
1997 None
1998 }
1999 }
2000 }
2001
2002 /// Load server info from storage, or fetch them from network and cache
2003 /// them.
2004 async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
2005 match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
2006 Ok(Some(stored)) => {
2007 if let Some(server_info) =
2008 stored.into_server_info().and_then(|info| info.maybe_decode())
2009 {
2010 return Ok(server_info);
2011 }
2012 }
2013 Ok(None) => {
2014 // fallthrough: cache is empty
2015 }
2016 Err(err) => {
2017 warn!("error when loading cached server info: {err}");
2018 // fallthrough to network.
2019 }
2020 }
2021
2022 let server_versions = self.fetch_server_versions(None).await?;
2023 let well_known = self.fetch_client_well_known().await;
2024 let server_info = ServerInfo::new(
2025 server_versions.versions.clone(),
2026 server_versions.unstable_features.clone(),
2027 well_known.map(Into::into),
2028 );
2029
2030 // Attempt to cache the result in storage.
2031 {
2032 if let Err(err) = self
2033 .state_store()
2034 .set_kv_data(
2035 StateStoreDataKey::ServerInfo,
2036 StateStoreDataValue::ServerInfo(server_info.clone()),
2037 )
2038 .await
2039 {
2040 warn!("error when caching server info: {err}");
2041 }
2042 }
2043
2044 Ok(server_info)
2045 }
2046
2047 pub(crate) async fn get_cached_versions(&self) -> Option<SupportedVersions> {
2048 let server_info = &self.inner.caches.server_info;
2049
2050 if let CachedValue::Cached(val) = &server_info.read().await.supported_versions {
2051 Some(val.clone())
2052 } else {
2053 None
2054 }
2055 }
2056
2057 async fn get_or_load_and_cache_server_info<
2058 Value,
2059 MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
2060 >(
2061 &self,
2062 map: MapFunction,
2063 ) -> HttpResult<Value> {
2064 let server_info = &self.inner.caches.server_info;
2065 if let CachedValue::Cached(val) = map(&*server_info.read().await) {
2066 return Ok(val);
2067 }
2068
2069 let mut guarded_server_info = server_info.write().await;
2070 if let CachedValue::Cached(val) = map(&guarded_server_info) {
2071 return Ok(val);
2072 }
2073
2074 let server_info = self.load_or_fetch_server_info().await?;
2075
2076 // Fill both unstable features and server versions at once.
2077 let mut supported = server_info.supported_versions();
2078 if supported.versions.is_empty() {
2079 supported.versions = [MatrixVersion::V1_0].into();
2080 }
2081
2082 guarded_server_info.supported_versions = CachedValue::Cached(supported);
2083 guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2084
2085 // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2086 // fetch an optional property), the function will always return some.
2087 Ok(map(&guarded_server_info).unwrap_cached_value())
2088 }
2089
2090 /// Get the Matrix versions and features supported by the homeserver by
2091 /// fetching them from the server or the cache.
2092 ///
2093 /// This is equivalent to calling both [`Client::server_versions()`] and
2094 /// [`Client::unstable_features()`]. To always fetch the result from the
2095 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2096 /// and then `.as_supported_versions()` on the response.
2097 ///
2098 /// # Examples
2099 ///
2100 /// ```no_run
2101 /// use ruma::api::{FeatureFlag, MatrixVersion};
2102 /// # use matrix_sdk::{Client, config::SyncSettings};
2103 /// # use url::Url;
2104 /// # async {
2105 /// # let homeserver = Url::parse("http://localhost:8080")?;
2106 /// # let mut client = Client::new(homeserver).await?;
2107 ///
2108 /// let supported = client.supported_versions().await?;
2109 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2110 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2111 ///
2112 /// let msc_x_feature = FeatureFlag::from("msc_x");
2113 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2114 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2115 /// # anyhow::Ok(()) };
2116 /// ```
2117 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2118 self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2119 .await
2120 }
2121
2122 /// Get the Matrix versions and features supported by the homeserver by
2123 /// fetching them from the cache.
2124 ///
2125 /// For a version of this function that fetches the supported versions and
2126 /// features from the homeserver if the [`SupportedVersions`] aren't
2127 /// found in the cache, take a look at the [`Client::server_versions()`]
2128 /// method.
2129 ///
2130 /// # Examples
2131 ///
2132 /// ```no_run
2133 /// use ruma::api::{FeatureFlag, MatrixVersion};
2134 /// # use matrix_sdk::{Client, config::SyncSettings};
2135 /// # use url::Url;
2136 /// # async {
2137 /// # let homeserver = Url::parse("http://localhost:8080")?;
2138 /// # let mut client = Client::new(homeserver).await?;
2139 ///
2140 /// let supported =
2141 /// if let Some(supported) = client.supported_versions_cached().await? {
2142 /// supported
2143 /// } else {
2144 /// client.fetch_server_versions(None).await?.as_supported_versions()
2145 /// };
2146 ///
2147 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2148 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2149 ///
2150 /// let msc_x_feature = FeatureFlag::from("msc_x");
2151 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2152 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2153 /// # anyhow::Ok(()) };
2154 /// ```
2155 pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2156 if let Some(cached) = self.get_cached_versions().await {
2157 Ok(Some(cached))
2158 } else if let Some(stored) =
2159 self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await?
2160 {
2161 if let Some(server_info) =
2162 stored.into_server_info().and_then(|info| info.maybe_decode())
2163 {
2164 Ok(Some(server_info.supported_versions()))
2165 } else {
2166 Ok(None)
2167 }
2168 } else {
2169 Ok(None)
2170 }
2171 }
2172
2173 /// Get the Matrix versions supported by the homeserver by fetching them
2174 /// from the server or the cache.
2175 ///
2176 /// # Examples
2177 ///
2178 /// ```no_run
2179 /// use ruma::api::MatrixVersion;
2180 /// # use matrix_sdk::{Client, config::SyncSettings};
2181 /// # use url::Url;
2182 /// # async {
2183 /// # let homeserver = Url::parse("http://localhost:8080")?;
2184 /// # let mut client = Client::new(homeserver).await?;
2185 ///
2186 /// let server_versions = client.server_versions().await?;
2187 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2188 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2189 /// # anyhow::Ok(()) };
2190 /// ```
2191 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2192 self.get_or_load_and_cache_server_info(|server_info| {
2193 server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2194 })
2195 .await
2196 }
2197
2198 /// Get the unstable features supported by the homeserver by fetching them
2199 /// from the server or the cache.
2200 ///
2201 /// # Examples
2202 ///
2203 /// ```no_run
2204 /// use matrix_sdk::ruma::api::FeatureFlag;
2205 /// # use matrix_sdk::{Client, config::SyncSettings};
2206 /// # use url::Url;
2207 /// # async {
2208 /// # let homeserver = Url::parse("http://localhost:8080")?;
2209 /// # let mut client = Client::new(homeserver).await?;
2210 ///
2211 /// let msc_x_feature = FeatureFlag::from("msc_x");
2212 /// let unstable_features = client.unstable_features().await?;
2213 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2214 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2215 /// # anyhow::Ok(()) };
2216 /// ```
2217 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2218 self.get_or_load_and_cache_server_info(|server_info| {
2219 server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2220 })
2221 .await
2222 }
2223
2224 /// Get information about the homeserver's advertised RTC foci by fetching
2225 /// the well-known file from the server or the cache.
2226 ///
2227 /// # Examples
2228 /// ```no_run
2229 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2230 /// # use url::Url;
2231 /// # async {
2232 /// # let homeserver = Url::parse("http://localhost:8080")?;
2233 /// # let mut client = Client::new(homeserver).await?;
2234 /// let rtc_foci = client.rtc_foci().await?;
2235 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2236 /// RtcFocusInfo::LiveKit(info) => Some(info),
2237 /// _ => None,
2238 /// });
2239 /// if let Some(info) = default_livekit_focus_info {
2240 /// println!("Default LiveKit service URL: {}", info.service_url);
2241 /// }
2242 /// # anyhow::Ok(()) };
2243 /// ```
2244 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2245 let well_known = self
2246 .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2247 .await?;
2248
2249 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2250 }
2251
2252 /// Empty the server version and unstable features cache.
2253 ///
2254 /// Since the SDK caches server info (versions, unstable features,
2255 /// well-known etc), it's possible to have a stale entry in the cache. This
2256 /// functions makes it possible to force reset it.
2257 pub async fn reset_server_info(&self) -> Result<()> {
2258 // Empty the in-memory caches.
2259 let mut guard = self.inner.caches.server_info.write().await;
2260 guard.supported_versions = CachedValue::NotSet;
2261
2262 // Empty the store cache.
2263 Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2264 }
2265
2266 /// Check whether MSC 4028 is enabled on the homeserver.
2267 ///
2268 /// # Examples
2269 ///
2270 /// ```no_run
2271 /// # use matrix_sdk::{Client, config::SyncSettings};
2272 /// # use url::Url;
2273 /// # async {
2274 /// # let homeserver = Url::parse("http://localhost:8080")?;
2275 /// # let mut client = Client::new(homeserver).await?;
2276 /// let msc4028_enabled =
2277 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2278 /// # anyhow::Ok(()) };
2279 /// ```
2280 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2281 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2282 }
2283
2284 /// Get information of all our own devices.
2285 ///
2286 /// # Examples
2287 ///
2288 /// ```no_run
2289 /// # use matrix_sdk::{Client, config::SyncSettings};
2290 /// # use url::Url;
2291 /// # async {
2292 /// # let homeserver = Url::parse("http://localhost:8080")?;
2293 /// # let mut client = Client::new(homeserver).await?;
2294 /// let response = client.devices().await?;
2295 ///
2296 /// for device in response.devices {
2297 /// println!(
2298 /// "Device: {} {}",
2299 /// device.device_id,
2300 /// device.display_name.as_deref().unwrap_or("")
2301 /// );
2302 /// }
2303 /// # anyhow::Ok(()) };
2304 /// ```
2305 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2306 let request = get_devices::v3::Request::new();
2307
2308 self.send(request).await
2309 }
2310
2311 /// Delete the given devices from the server.
2312 ///
2313 /// # Arguments
2314 ///
2315 /// * `devices` - The list of devices that should be deleted from the
2316 /// server.
2317 ///
2318 /// * `auth_data` - This request requires user interactive auth, the first
2319 /// request needs to set this to `None` and will always fail with an
2320 /// `UiaaResponse`. The response will contain information for the
2321 /// interactive auth and the same request needs to be made but this time
2322 /// with some `auth_data` provided.
2323 ///
2324 /// ```no_run
2325 /// # use matrix_sdk::{
2326 /// # ruma::{api::client::uiaa, device_id},
2327 /// # Client, Error, config::SyncSettings,
2328 /// # };
2329 /// # use serde_json::json;
2330 /// # use url::Url;
2331 /// # use std::collections::BTreeMap;
2332 /// # async {
2333 /// # let homeserver = Url::parse("http://localhost:8080")?;
2334 /// # let mut client = Client::new(homeserver).await?;
2335 /// let devices = &[device_id!("DEVICEID").to_owned()];
2336 ///
2337 /// if let Err(e) = client.delete_devices(devices, None).await {
2338 /// if let Some(info) = e.as_uiaa_response() {
2339 /// let mut password = uiaa::Password::new(
2340 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2341 /// "wordpass".to_owned(),
2342 /// );
2343 /// password.session = info.session.clone();
2344 ///
2345 /// client
2346 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2347 /// .await?;
2348 /// }
2349 /// }
2350 /// # anyhow::Ok(()) };
2351 pub async fn delete_devices(
2352 &self,
2353 devices: &[OwnedDeviceId],
2354 auth_data: Option<uiaa::AuthData>,
2355 ) -> HttpResult<delete_devices::v3::Response> {
2356 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2357 request.auth = auth_data;
2358
2359 self.send(request).await
2360 }
2361
2362 /// Change the display name of a device owned by the current user.
2363 ///
2364 /// Returns a `update_device::Response` which specifies the result
2365 /// of the operation.
2366 ///
2367 /// # Arguments
2368 ///
2369 /// * `device_id` - The ID of the device to change the display name of.
2370 /// * `display_name` - The new display name to set.
2371 pub async fn rename_device(
2372 &self,
2373 device_id: &DeviceId,
2374 display_name: &str,
2375 ) -> HttpResult<update_device::v3::Response> {
2376 let mut request = update_device::v3::Request::new(device_id.to_owned());
2377 request.display_name = Some(display_name.to_owned());
2378
2379 self.send(request).await
2380 }
2381
2382 /// Check whether a device with a specific ID exists on the server.
2383 ///
2384 /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2385 /// with 404 and the underlying error otherwise.
2386 ///
2387 /// # Arguments
2388 ///
2389 /// * `device_id` - The ID of the device to query.
2390 pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2391 let request = device::get_device::v3::Request::new(device_id);
2392 match self.send(request).await {
2393 Ok(_) => Ok(true),
2394 Err(err) => {
2395 if let Some(error) = err.as_client_api_error()
2396 && error.status_code == 404
2397 {
2398 Ok(false)
2399 } else {
2400 Err(err.into())
2401 }
2402 }
2403 }
2404 }
2405
2406 /// Synchronize the client's state with the latest state on the server.
2407 ///
2408 /// ## Syncing Events
2409 ///
2410 /// Messages or any other type of event need to be periodically fetched from
2411 /// the server, this is achieved by sending a `/sync` request to the server.
2412 ///
2413 /// The first sync is sent out without a [`token`]. The response of the
2414 /// first sync will contain a [`next_batch`] field which should then be
2415 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2416 /// don't receive the same events multiple times.
2417 ///
2418 /// ## Long Polling
2419 ///
2420 /// A sync should in the usual case always be in flight. The
2421 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2422 /// long the server will wait for new events before it will respond.
2423 /// The server will respond immediately if some new events arrive before the
2424 /// timeout has expired. If no changes arrive and the timeout expires an
2425 /// empty sync response will be sent to the client.
2426 ///
2427 /// This method of sending a request that may not receive a response
2428 /// immediately is called long polling.
2429 ///
2430 /// ## Filtering Events
2431 ///
2432 /// The number or type of messages and events that the client should receive
2433 /// from the server can be altered using a [`Filter`].
2434 ///
2435 /// Filters can be non-trivial and, since they will be sent with every sync
2436 /// request, they may take up a bunch of unnecessary bandwidth.
2437 ///
2438 /// Luckily filters can be uploaded to the server and reused using an unique
2439 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2440 /// method.
2441 ///
2442 /// # Arguments
2443 ///
2444 /// * `sync_settings` - Settings for the sync call, this allows us to set
2445 /// various options to configure the sync:
2446 /// * [`filter`] - To configure which events we receive and which get
2447 /// [filtered] by the server
2448 /// * [`timeout`] - To configure our [long polling] setup.
2449 /// * [`token`] - To tell the server which events we already received
2450 /// and where we wish to continue syncing.
2451 /// * [`full_state`] - To tell the server that we wish to receive all
2452 /// state events, regardless of our configured [`token`].
2453 /// * [`set_presence`] - To tell the server to set the presence and to
2454 /// which state.
2455 ///
2456 /// # Examples
2457 ///
2458 /// ```no_run
2459 /// # use url::Url;
2460 /// # async {
2461 /// # let homeserver = Url::parse("http://localhost:8080")?;
2462 /// # let username = "";
2463 /// # let password = "";
2464 /// use matrix_sdk::{
2465 /// Client, config::SyncSettings,
2466 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2467 /// };
2468 ///
2469 /// let client = Client::new(homeserver).await?;
2470 /// client.matrix_auth().login_username(username, password).send().await?;
2471 ///
2472 /// // Sync once so we receive the client state and old messages.
2473 /// client.sync_once(SyncSettings::default()).await?;
2474 ///
2475 /// // Register our handler so we start responding once we receive a new
2476 /// // event.
2477 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2478 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2479 /// });
2480 ///
2481 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2482 /// // from our `sync_once()` call automatically.
2483 /// client.sync(SyncSettings::default()).await;
2484 /// # anyhow::Ok(()) };
2485 /// ```
2486 ///
2487 /// [`sync`]: #method.sync
2488 /// [`SyncSettings`]: crate::config::SyncSettings
2489 /// [`token`]: crate::config::SyncSettings#method.token
2490 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2491 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2492 /// [`set_presence`]: ruma::presence::PresenceState
2493 /// [`filter`]: crate::config::SyncSettings#method.filter
2494 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2495 /// [`next_batch`]: SyncResponse#structfield.next_batch
2496 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2497 /// [long polling]: #long-polling
2498 /// [filtered]: #filtering-events
2499 #[instrument(skip(self))]
2500 pub async fn sync_once(
2501 &self,
2502 sync_settings: crate::config::SyncSettings,
2503 ) -> Result<SyncResponse> {
2504 // The sync might not return for quite a while due to the timeout.
2505 // We'll see if there's anything crypto related to send out before we
2506 // sync, i.e. if we closed our client after a sync but before the
2507 // crypto requests were sent out.
2508 //
2509 // This will mostly be a no-op.
2510 #[cfg(feature = "e2e-encryption")]
2511 if let Err(e) = self.send_outgoing_requests().await {
2512 error!(error = ?e, "Error while sending outgoing E2EE requests");
2513 }
2514
2515 let token = match sync_settings.token {
2516 SyncToken::Specific(token) => Some(token),
2517 SyncToken::NoToken => None,
2518 SyncToken::ReusePrevious => self.sync_token().await,
2519 };
2520
2521 let request = assign!(sync_events::v3::Request::new(), {
2522 filter: sync_settings.filter.map(|f| *f),
2523 since: token,
2524 full_state: sync_settings.full_state,
2525 set_presence: sync_settings.set_presence,
2526 timeout: sync_settings.timeout,
2527 use_state_after: true,
2528 });
2529 let mut request_config = self.request_config();
2530 if let Some(timeout) = sync_settings.timeout {
2531 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2532 request_config.timeout = Some(base_timeout + timeout);
2533 }
2534
2535 let response = self.send(request).with_request_config(request_config).await?;
2536 let next_batch = response.next_batch.clone();
2537 let response = self.process_sync(response).await?;
2538
2539 #[cfg(feature = "e2e-encryption")]
2540 if let Err(e) = self.send_outgoing_requests().await {
2541 error!(error = ?e, "Error while sending outgoing E2EE requests");
2542 }
2543
2544 self.inner.sync_beat.notify(usize::MAX);
2545
2546 Ok(SyncResponse::new(next_batch, response))
2547 }
2548
2549 /// Repeatedly synchronize the client state with the server.
2550 ///
2551 /// This method will only return on error, if cancellation is needed
2552 /// the method should be wrapped in a cancelable task or the
2553 /// [`Client::sync_with_callback`] method can be used or
2554 /// [`Client::sync_with_result_callback`] if you want to handle error
2555 /// cases in the loop, too.
2556 ///
2557 /// This method will internally call [`Client::sync_once`] in a loop.
2558 ///
2559 /// This method can be used with the [`Client::add_event_handler`]
2560 /// method to react to individual events. If you instead wish to handle
2561 /// events in a bulk manner the [`Client::sync_with_callback`],
2562 /// [`Client::sync_with_result_callback`] and
2563 /// [`Client::sync_stream`] methods can be used instead. Those methods
2564 /// repeatedly return the whole sync response.
2565 ///
2566 /// # Arguments
2567 ///
2568 /// * `sync_settings` - Settings for the sync call. *Note* that those
2569 /// settings will be only used for the first sync call. See the argument
2570 /// docs for [`Client::sync_once`] for more info.
2571 ///
2572 /// # Return
2573 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2574 /// up to the user of the API to check the error and decide whether the sync
2575 /// should continue or not.
2576 ///
2577 /// # Examples
2578 ///
2579 /// ```no_run
2580 /// # use url::Url;
2581 /// # async {
2582 /// # let homeserver = Url::parse("http://localhost:8080")?;
2583 /// # let username = "";
2584 /// # let password = "";
2585 /// use matrix_sdk::{
2586 /// Client, config::SyncSettings,
2587 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2588 /// };
2589 ///
2590 /// let client = Client::new(homeserver).await?;
2591 /// client.matrix_auth().login_username(&username, &password).send().await?;
2592 ///
2593 /// // Register our handler so we start responding once we receive a new
2594 /// // event.
2595 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2596 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2597 /// });
2598 ///
2599 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2600 /// // automatically.
2601 /// client.sync(SyncSettings::default()).await?;
2602 /// # anyhow::Ok(()) };
2603 /// ```
2604 ///
2605 /// [argument docs]: #method.sync_once
2606 /// [`sync_with_callback`]: #method.sync_with_callback
2607 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2608 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2609 }
2610
2611 /// Repeatedly call sync to synchronize the client state with the server.
2612 ///
2613 /// # Arguments
2614 ///
2615 /// * `sync_settings` - Settings for the sync call. *Note* that those
2616 /// settings will be only used for the first sync call. See the argument
2617 /// docs for [`Client::sync_once`] for more info.
2618 ///
2619 /// * `callback` - A callback that will be called every time a successful
2620 /// response has been fetched from the server. The callback must return a
2621 /// boolean which signalizes if the method should stop syncing. If the
2622 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2623 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2624 ///
2625 /// # Return
2626 /// The sync runs until an error occurs or the
2627 /// callback indicates that the Loop should stop. If the callback asked for
2628 /// a regular stop, the result will be `Ok(())` otherwise the
2629 /// `Err(Error)` is returned.
2630 ///
2631 /// # Examples
2632 ///
2633 /// The following example demonstrates how to sync forever while sending all
2634 /// the interesting events through a mpsc channel to another thread e.g. a
2635 /// UI thread.
2636 ///
2637 /// ```no_run
2638 /// # use std::time::Duration;
2639 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2640 /// # use url::Url;
2641 /// # async {
2642 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2643 /// # let mut client = Client::new(homeserver).await.unwrap();
2644 ///
2645 /// use tokio::sync::mpsc::channel;
2646 ///
2647 /// let (tx, rx) = channel(100);
2648 ///
2649 /// let sync_channel = &tx;
2650 /// let sync_settings = SyncSettings::new()
2651 /// .timeout(Duration::from_secs(30));
2652 ///
2653 /// client
2654 /// .sync_with_callback(sync_settings, |response| async move {
2655 /// let channel = sync_channel;
2656 /// for (room_id, room) in response.rooms.joined {
2657 /// for event in room.timeline.events {
2658 /// channel.send(event).await.unwrap();
2659 /// }
2660 /// }
2661 ///
2662 /// LoopCtrl::Continue
2663 /// })
2664 /// .await;
2665 /// };
2666 /// ```
2667 #[instrument(skip_all)]
2668 pub async fn sync_with_callback<C>(
2669 &self,
2670 sync_settings: crate::config::SyncSettings,
2671 callback: impl Fn(SyncResponse) -> C,
2672 ) -> Result<(), Error>
2673 where
2674 C: Future<Output = LoopCtrl>,
2675 {
2676 self.sync_with_result_callback(sync_settings, |result| async {
2677 Ok(callback(result?).await)
2678 })
2679 .await
2680 }
2681
2682 /// Repeatedly call sync to synchronize the client state with the server.
2683 ///
2684 /// # Arguments
2685 ///
2686 /// * `sync_settings` - Settings for the sync call. *Note* that those
2687 /// settings will be only used for the first sync call. See the argument
2688 /// docs for [`Client::sync_once`] for more info.
2689 ///
2690 /// * `callback` - A callback that will be called every time after a
2691 /// response has been received, failure or not. The callback returns a
2692 /// `Result<LoopCtrl, Error>`, too. When returning
2693 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2694 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2695 /// function returns `Ok(())`. In case the callback can't handle the
2696 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2697 /// which results in the sync ending and the `Err(Error)` being returned.
2698 ///
2699 /// # Return
2700 /// The sync runs until an error occurs that the callback can't handle or
2701 /// the callback indicates that the Loop should stop. If the callback
2702 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2703 /// `Err(Error)` is returned.
2704 ///
2705 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2706 /// this, and are handled first without sending the result to the
2707 /// callback. Only after they have exceeded is the `Result` handed to
2708 /// the callback.
2709 ///
2710 /// # Examples
2711 ///
2712 /// The following example demonstrates how to sync forever while sending all
2713 /// the interesting events through a mpsc channel to another thread e.g. a
2714 /// UI thread.
2715 ///
2716 /// ```no_run
2717 /// # use std::time::Duration;
2718 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2719 /// # use url::Url;
2720 /// # async {
2721 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2722 /// # let mut client = Client::new(homeserver).await.unwrap();
2723 /// #
2724 /// use tokio::sync::mpsc::channel;
2725 ///
2726 /// let (tx, rx) = channel(100);
2727 ///
2728 /// let sync_channel = &tx;
2729 /// let sync_settings = SyncSettings::new()
2730 /// .timeout(Duration::from_secs(30));
2731 ///
2732 /// client
2733 /// .sync_with_result_callback(sync_settings, |response| async move {
2734 /// let channel = sync_channel;
2735 /// let sync_response = response?;
2736 /// for (room_id, room) in sync_response.rooms.joined {
2737 /// for event in room.timeline.events {
2738 /// channel.send(event).await.unwrap();
2739 /// }
2740 /// }
2741 ///
2742 /// Ok(LoopCtrl::Continue)
2743 /// })
2744 /// .await;
2745 /// };
2746 /// ```
2747 #[instrument(skip(self, callback))]
2748 pub async fn sync_with_result_callback<C>(
2749 &self,
2750 sync_settings: crate::config::SyncSettings,
2751 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2752 ) -> Result<(), Error>
2753 where
2754 C: Future<Output = Result<LoopCtrl, Error>>,
2755 {
2756 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2757
2758 while let Some(result) = sync_stream.next().await {
2759 trace!("Running callback");
2760 if callback(result).await? == LoopCtrl::Break {
2761 trace!("Callback told us to stop");
2762 break;
2763 }
2764 trace!("Done running callback");
2765 }
2766
2767 Ok(())
2768 }
2769
2770 //// Repeatedly synchronize the client state with the server.
2771 ///
2772 /// This method will internally call [`Client::sync_once`] in a loop and is
2773 /// equivalent to the [`Client::sync`] method but the responses are provided
2774 /// as an async stream.
2775 ///
2776 /// # Arguments
2777 ///
2778 /// * `sync_settings` - Settings for the sync call. *Note* that those
2779 /// settings will be only used for the first sync call. See the argument
2780 /// docs for [`Client::sync_once`] for more info.
2781 ///
2782 /// # Examples
2783 ///
2784 /// ```no_run
2785 /// # use url::Url;
2786 /// # async {
2787 /// # let homeserver = Url::parse("http://localhost:8080")?;
2788 /// # let username = "";
2789 /// # let password = "";
2790 /// use futures_util::StreamExt;
2791 /// use matrix_sdk::{Client, config::SyncSettings};
2792 ///
2793 /// let client = Client::new(homeserver).await?;
2794 /// client.matrix_auth().login_username(&username, &password).send().await?;
2795 ///
2796 /// let mut sync_stream =
2797 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2798 ///
2799 /// while let Some(Ok(response)) = sync_stream.next().await {
2800 /// for room in response.rooms.joined.values() {
2801 /// for e in &room.timeline.events {
2802 /// if let Ok(event) = e.raw().deserialize() {
2803 /// println!("Received event {:?}", event);
2804 /// }
2805 /// }
2806 /// }
2807 /// }
2808 ///
2809 /// # anyhow::Ok(()) };
2810 /// ```
2811 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2812 #[instrument(skip(self))]
2813 pub async fn sync_stream(
2814 &self,
2815 mut sync_settings: crate::config::SyncSettings,
2816 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2817 let mut is_first_sync = true;
2818 let mut timeout = None;
2819 let mut last_sync_time: Option<Instant> = None;
2820
2821 let parent_span = Span::current();
2822
2823 async_stream::stream!({
2824 loop {
2825 trace!("Syncing");
2826
2827 if sync_settings.ignore_timeout_on_first_sync {
2828 if is_first_sync {
2829 timeout = sync_settings.timeout.take();
2830 } else if sync_settings.timeout.is_none() && timeout.is_some() {
2831 sync_settings.timeout = timeout.take();
2832 }
2833
2834 is_first_sync = false;
2835 }
2836
2837 yield self
2838 .sync_loop_helper(&mut sync_settings)
2839 .instrument(parent_span.clone())
2840 .await;
2841
2842 Client::delay_sync(&mut last_sync_time).await
2843 }
2844 })
2845 }
2846
2847 /// Get the current, if any, sync token of the client.
2848 /// This will be None if the client didn't sync at least once.
2849 pub(crate) async fn sync_token(&self) -> Option<String> {
2850 self.inner.base_client.sync_token().await
2851 }
2852
2853 /// Gets information about the owner of a given access token.
2854 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2855 let request = whoami::v3::Request::new();
2856 self.send(request).await
2857 }
2858
2859 /// Subscribes a new receiver to client SessionChange broadcasts.
2860 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2861 let broadcast = &self.auth_ctx().session_change_sender;
2862 broadcast.subscribe()
2863 }
2864
2865 /// Sets the save/restore session callbacks.
2866 ///
2867 /// This is another mechanism to get synchronous updates to session tokens,
2868 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2869 pub fn set_session_callbacks(
2870 &self,
2871 reload_session_callback: Box<ReloadSessionCallback>,
2872 save_session_callback: Box<SaveSessionCallback>,
2873 ) -> Result<()> {
2874 self.inner
2875 .auth_ctx
2876 .reload_session_callback
2877 .set(reload_session_callback)
2878 .map_err(|_| Error::MultipleSessionCallbacks)?;
2879
2880 self.inner
2881 .auth_ctx
2882 .save_session_callback
2883 .set(save_session_callback)
2884 .map_err(|_| Error::MultipleSessionCallbacks)?;
2885
2886 Ok(())
2887 }
2888
2889 /// Get the notification settings of the current owner of the client.
2890 pub async fn notification_settings(&self) -> NotificationSettings {
2891 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2892 NotificationSettings::new(self.clone(), ruleset)
2893 }
2894
2895 /// Create a new specialized `Client` that can process notifications.
2896 ///
2897 /// See [`CrossProcessLock::new`] to learn more about
2898 /// `cross_process_store_locks_holder_name`.
2899 ///
2900 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
2901 pub async fn notification_client(
2902 &self,
2903 cross_process_store_locks_holder_name: String,
2904 ) -> Result<Client> {
2905 let client = Client {
2906 inner: ClientInner::new(
2907 self.inner.auth_ctx.clone(),
2908 self.server().cloned(),
2909 self.homeserver(),
2910 self.sliding_sync_version(),
2911 self.inner.http_client.clone(),
2912 self.inner
2913 .base_client
2914 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2915 .await?,
2916 self.inner.caches.server_info.read().await.clone(),
2917 self.inner.respect_login_well_known,
2918 self.inner.event_cache.clone(),
2919 self.inner.send_queue_data.clone(),
2920 self.inner.latest_events.clone(),
2921 #[cfg(feature = "e2e-encryption")]
2922 self.inner.e2ee.encryption_settings,
2923 #[cfg(feature = "e2e-encryption")]
2924 self.inner.enable_share_history_on_invite,
2925 cross_process_store_locks_holder_name,
2926 #[cfg(feature = "experimental-search")]
2927 self.inner.search_index.clone(),
2928 self.inner.thread_subscription_catchup.clone(),
2929 )
2930 .await,
2931 };
2932
2933 Ok(client)
2934 }
2935
2936 /// The [`EventCache`] instance for this [`Client`].
2937 pub fn event_cache(&self) -> &EventCache {
2938 // SAFETY: always initialized in the `Client` ctor.
2939 self.inner.event_cache.get().unwrap()
2940 }
2941
2942 /// The [`LatestEvents`] instance for this [`Client`].
2943 pub async fn latest_events(&self) -> &LatestEvents {
2944 self.inner
2945 .latest_events
2946 .get_or_init(|| async {
2947 LatestEvents::new(
2948 WeakClient::from_client(self),
2949 self.event_cache().clone(),
2950 SendQueue::new(self.clone()),
2951 )
2952 })
2953 .await
2954 }
2955
2956 /// Waits until an at least partially synced room is received, and returns
2957 /// it.
2958 ///
2959 /// **Note: this function will loop endlessly until either it finds the room
2960 /// or an externally set timeout happens.**
2961 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2962 loop {
2963 if let Some(room) = self.get_room(room_id) {
2964 if room.is_state_partially_or_fully_synced() {
2965 debug!("Found just created room!");
2966 return room;
2967 }
2968 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2969 } else {
2970 debug!("Room wasn't found, waiting for sync beat to try again");
2971 }
2972 self.inner.sync_beat.listen().await;
2973 }
2974 }
2975
2976 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2977 /// join it.
2978 pub async fn knock(
2979 &self,
2980 room_id_or_alias: OwnedRoomOrAliasId,
2981 reason: Option<String>,
2982 server_names: Vec<OwnedServerName>,
2983 ) -> Result<Room> {
2984 let request =
2985 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2986 let response = self.send(request).await?;
2987 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2988 Ok(Room::new(self.clone(), base_room))
2989 }
2990
2991 /// Checks whether the provided `user_id` belongs to an ignored user.
2992 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2993 self.base_client().is_user_ignored(user_id).await
2994 }
2995
2996 /// Gets the `max_upload_size` value from the homeserver, getting either a
2997 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2998 /// missing.
2999 ///
3000 /// Check the spec for more info:
3001 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3002 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3003 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3004 if let Some(data) = max_upload_size_lock.get() {
3005 return Ok(data.to_owned());
3006 }
3007
3008 // Use the authenticated endpoint when the server supports it.
3009 let supported_versions = self.supported_versions().await?;
3010 let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3011 .is_supported(&supported_versions);
3012
3013 let upload_size = if use_auth {
3014 self.send(authenticated_media::get_media_config::v1::Request::default())
3015 .await?
3016 .upload_size
3017 } else {
3018 #[allow(deprecated)]
3019 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3020 };
3021
3022 match max_upload_size_lock.set(upload_size) {
3023 Ok(_) => Ok(upload_size),
3024 Err(error) => {
3025 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3026 }
3027 }
3028 }
3029
3030 /// The settings to use for decrypting events.
3031 #[cfg(feature = "e2e-encryption")]
3032 pub fn decryption_settings(&self) -> &DecryptionSettings {
3033 &self.base_client().decryption_settings
3034 }
3035
3036 /// Returns the [`SearchIndex`] for this [`Client`].
3037 #[cfg(feature = "experimental-search")]
3038 pub fn search_index(&self) -> &SearchIndex {
3039 &self.inner.search_index
3040 }
3041
3042 /// Whether the client is configured to take thread subscriptions (MSC4306
3043 /// and MSC4308) into account.
3044 ///
3045 /// This may cause filtering out of thread subscriptions, and loading the
3046 /// thread subscriptions via the sliding sync extension, when the room
3047 /// list service is being used.
3048 pub fn enabled_thread_subscriptions(&self) -> bool {
3049 match self.base_client().threading_support {
3050 ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
3051 ThreadingSupport::Disabled => false,
3052 }
3053 }
3054
3055 /// Fetch thread subscriptions changes between `from` and up to `to`.
3056 ///
3057 /// The `limit` optional parameter can be used to limit the number of
3058 /// entries in a response. It can also be overridden by the server, if
3059 /// it's deemed too large.
3060 pub async fn fetch_thread_subscriptions(
3061 &self,
3062 from: Option<String>,
3063 to: Option<String>,
3064 limit: Option<UInt>,
3065 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3066 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3067 from,
3068 to,
3069 limit,
3070 });
3071 Ok(self.send(request).await?)
3072 }
3073
3074 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3075 self.inner.thread_subscription_catchup.get().unwrap()
3076 }
3077}
3078
3079#[cfg(any(feature = "testing", test))]
3080impl Client {
3081 /// Test helper to mark users as tracked by the crypto layer.
3082 #[cfg(feature = "e2e-encryption")]
3083 pub async fn update_tracked_users_for_testing(
3084 &self,
3085 user_ids: impl IntoIterator<Item = &UserId>,
3086 ) {
3087 let olm = self.olm_machine().await;
3088 let olm = olm.as_ref().unwrap();
3089 olm.update_tracked_users(user_ids).await.unwrap();
3090 }
3091}
3092
3093/// A weak reference to the inner client, useful when trying to get a handle
3094/// on the owning client.
3095#[derive(Clone, Debug)]
3096pub(crate) struct WeakClient {
3097 client: Weak<ClientInner>,
3098}
3099
3100impl WeakClient {
3101 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3102 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3103 Self { client: Arc::downgrade(client) }
3104 }
3105
3106 /// Construct a [`WeakClient`] from a [`Client`].
3107 pub fn from_client(client: &Client) -> Self {
3108 Self::from_inner(&client.inner)
3109 }
3110
3111 /// Attempts to get a [`Client`] from this [`WeakClient`].
3112 pub fn get(&self) -> Option<Client> {
3113 self.client.upgrade().map(|inner| Client { inner })
3114 }
3115
3116 /// Gets the number of strong (`Arc`) pointers still pointing to this
3117 /// client.
3118 #[allow(dead_code)]
3119 pub fn strong_count(&self) -> usize {
3120 self.client.strong_count()
3121 }
3122}
3123
3124#[derive(Clone)]
3125struct ClientServerInfo {
3126 /// The Matrix versions and unstable features the server supports (known
3127 /// ones only).
3128 supported_versions: CachedValue<SupportedVersions>,
3129
3130 /// The server's well-known file, if any.
3131 well_known: CachedValue<Option<WellKnownResponse>>,
3132}
3133
3134/// A cached value that can either be set or not set, used to avoid confusion
3135/// between a value that is set to `None` (because it doesn't exist) and a value
3136/// that has not been cached yet.
3137#[derive(Clone)]
3138enum CachedValue<Value> {
3139 /// A value has been cached.
3140 Cached(Value),
3141 /// Nothing has been cached yet.
3142 NotSet,
3143}
3144
3145impl<Value> CachedValue<Value> {
3146 /// Unwraps the cached value, returning it if it exists.
3147 ///
3148 /// # Panics
3149 ///
3150 /// If the cached value is not set, this will panic.
3151 fn unwrap_cached_value(self) -> Value {
3152 match self {
3153 CachedValue::Cached(value) => value,
3154 CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
3155 }
3156 }
3157
3158 /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
3159 fn as_ref(&self) -> CachedValue<&Value> {
3160 match self {
3161 Self::Cached(value) => CachedValue::Cached(value),
3162 Self::NotSet => CachedValue::NotSet,
3163 }
3164 }
3165
3166 /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3167 /// function to a contained value (if `Cached`) or returns `NotSet` (if
3168 /// `NotSet`).
3169 fn map<Other, F>(self, f: F) -> CachedValue<Other>
3170 where
3171 F: FnOnce(Value) -> Other,
3172 {
3173 match self {
3174 Self::Cached(value) => CachedValue::Cached(f(value)),
3175 Self::NotSet => CachedValue::NotSet,
3176 }
3177 }
3178}
3179
3180/// Information about the state of a room before we joined it.
3181#[derive(Debug, Clone, Default)]
3182struct PreJoinRoomInfo {
3183 /// The user who invited us to the room, if any.
3184 pub inviter: Option<RoomMember>,
3185}
3186
3187// The http mocking library is not supported for wasm32
3188#[cfg(all(test, not(target_family = "wasm")))]
3189pub(crate) mod tests {
3190 use std::{sync::Arc, time::Duration};
3191
3192 use assert_matches::assert_matches;
3193 use assert_matches2::assert_let;
3194 use eyeball::SharedObservable;
3195 use futures_util::{FutureExt, StreamExt, pin_mut};
3196 use js_int::{UInt, uint};
3197 use matrix_sdk_base::{
3198 RoomState,
3199 store::{MemoryStore, StoreConfig},
3200 };
3201 use matrix_sdk_test::{
3202 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3203 event_factory::EventFactory,
3204 };
3205 #[cfg(target_family = "wasm")]
3206 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3207
3208 use ruma::{
3209 RoomId, ServerName, UserId,
3210 api::{
3211 FeatureFlag, MatrixVersion,
3212 client::{
3213 discovery::discover_homeserver::RtcFocusInfo,
3214 room::create_room::v3::Request as CreateRoomRequest,
3215 },
3216 },
3217 assign,
3218 events::{
3219 ignored_user_list::IgnoredUserListEventContent,
3220 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3221 },
3222 owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3223 };
3224 use serde_json::json;
3225 use stream_assert::{assert_next_matches, assert_pending};
3226 use tokio::{
3227 spawn,
3228 time::{sleep, timeout},
3229 };
3230 use url::Url;
3231
3232 use super::Client;
3233 use crate::{
3234 Error, TransmissionProgress,
3235 client::{WeakClient, futures::SendMediaUploadRequest},
3236 config::{RequestConfig, SyncSettings},
3237 futures::SendRequest,
3238 media::MediaError,
3239 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3240 };
3241
3242 #[async_test]
3243 async fn test_account_data() {
3244 let server = MatrixMockServer::new().await;
3245 let client = server.client_builder().build().await;
3246
3247 let f = EventFactory::new();
3248 server
3249 .mock_sync()
3250 .ok_and_run(&client, |builder| {
3251 builder.add_global_account_data(
3252 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3253 );
3254 })
3255 .await;
3256
3257 let content = client
3258 .account()
3259 .account_data::<IgnoredUserListEventContent>()
3260 .await
3261 .unwrap()
3262 .unwrap()
3263 .deserialize()
3264 .unwrap();
3265
3266 assert_eq!(content.ignored_users.len(), 1);
3267 }
3268
3269 #[async_test]
3270 async fn test_successful_discovery() {
3271 // Imagine this is `matrix.org`.
3272 let server = MatrixMockServer::new().await;
3273 let server_url = server.uri();
3274
3275 // Imagine this is `matrix-client.matrix.org`.
3276 let homeserver = MatrixMockServer::new().await;
3277 let homeserver_url = homeserver.uri();
3278
3279 // Imagine Alice has the user ID `@alice:matrix.org`.
3280 let domain = server_url.strip_prefix("http://").unwrap();
3281 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3282
3283 // The `.well-known` is on the server (e.g. `matrix.org`).
3284 server
3285 .mock_well_known()
3286 .ok_with_homeserver_url(&homeserver_url)
3287 .mock_once()
3288 .named("well-known")
3289 .mount()
3290 .await;
3291
3292 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3293 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3294
3295 let client = Client::builder()
3296 .insecure_server_name_no_tls(alice.server_name())
3297 .build()
3298 .await
3299 .unwrap();
3300
3301 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3302 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3303 client.server_versions().await.unwrap();
3304 }
3305
3306 #[async_test]
3307 async fn test_discovery_broken_server() {
3308 let server = MatrixMockServer::new().await;
3309 let server_url = server.uri();
3310 let domain = server_url.strip_prefix("http://").unwrap();
3311 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3312
3313 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3314
3315 assert!(
3316 Client::builder()
3317 .insecure_server_name_no_tls(alice.server_name())
3318 .build()
3319 .await
3320 .is_err(),
3321 "Creating a client from a user ID should fail when the .well-known request fails."
3322 );
3323 }
3324
3325 #[async_test]
3326 async fn test_room_creation() {
3327 let server = MatrixMockServer::new().await;
3328 let client = server.client_builder().build().await;
3329
3330 server
3331 .mock_sync()
3332 .ok_and_run(&client, |builder| {
3333 builder.add_joined_room(
3334 JoinedRoomBuilder::default()
3335 .add_state_event(StateTestEvent::Member)
3336 .add_state_event(StateTestEvent::PowerLevels),
3337 );
3338 })
3339 .await;
3340
3341 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3342 assert_eq!(room.state(), RoomState::Joined);
3343 }
3344
3345 #[async_test]
3346 async fn test_retry_limit_http_requests() {
3347 let server = MatrixMockServer::new().await;
3348 let client = server
3349 .client_builder()
3350 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3351 .build()
3352 .await;
3353
3354 assert!(client.request_config().retry_limit.unwrap() == 3);
3355
3356 server.mock_login().error500().expect(3).mount().await;
3357
3358 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3359 }
3360
3361 #[async_test]
3362 async fn test_retry_timeout_http_requests() {
3363 // Keep this timeout small so that the test doesn't take long
3364 let retry_timeout = Duration::from_secs(5);
3365 let server = MatrixMockServer::new().await;
3366 let client = server
3367 .client_builder()
3368 .on_builder(|builder| {
3369 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3370 })
3371 .build()
3372 .await;
3373
3374 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3375
3376 server.mock_login().error500().expect(2..).mount().await;
3377
3378 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3379 }
3380
3381 #[async_test]
3382 async fn test_short_retry_initial_http_requests() {
3383 let server = MatrixMockServer::new().await;
3384 let client = server
3385 .client_builder()
3386 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3387 .build()
3388 .await;
3389
3390 server.mock_login().error500().expect(3..).mount().await;
3391
3392 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3393 }
3394
3395 #[async_test]
3396 async fn test_no_retry_http_requests() {
3397 let server = MatrixMockServer::new().await;
3398 let client = server.client_builder().build().await;
3399
3400 server.mock_devices().error500().mock_once().mount().await;
3401
3402 client.devices().await.unwrap_err();
3403 }
3404
3405 #[async_test]
3406 async fn test_set_homeserver() {
3407 let client = MockClientBuilder::new(None).build().await;
3408 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3409
3410 let homeserver = Url::parse("http://example.com/").unwrap();
3411 client.set_homeserver(homeserver.clone());
3412 assert_eq!(client.homeserver(), homeserver);
3413 }
3414
3415 #[async_test]
3416 async fn test_search_user_request() {
3417 let server = MatrixMockServer::new().await;
3418 let client = server.client_builder().build().await;
3419
3420 server.mock_user_directory().ok().mock_once().mount().await;
3421
3422 let response = client.search_users("test", 50).await.unwrap();
3423 assert_eq!(response.results.len(), 1);
3424 let result = &response.results[0];
3425 assert_eq!(result.user_id.to_string(), "@test:example.me");
3426 assert_eq!(result.display_name.clone().unwrap(), "Test");
3427 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3428 assert!(!response.limited);
3429 }
3430
3431 #[async_test]
3432 async fn test_request_unstable_features() {
3433 let server = MatrixMockServer::new().await;
3434 let client = server.client_builder().no_server_versions().build().await;
3435
3436 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3437
3438 let unstable_features = client.unstable_features().await.unwrap();
3439 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3440 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3441 }
3442
3443 #[async_test]
3444 async fn test_can_homeserver_push_encrypted_event_to_device() {
3445 let server = MatrixMockServer::new().await;
3446 let client = server.client_builder().no_server_versions().build().await;
3447
3448 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3449
3450 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3451 assert!(msc4028_enabled);
3452 }
3453
3454 #[async_test]
3455 async fn test_recently_visited_rooms() {
3456 // Tracking recently visited rooms requires authentication
3457 let client = MockClientBuilder::new(None).unlogged().build().await;
3458 assert_matches!(
3459 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3460 Err(Error::AuthenticationRequired)
3461 );
3462
3463 let client = MockClientBuilder::new(None).build().await;
3464 let account = client.account();
3465
3466 // We should start off with an empty list
3467 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3468
3469 // Tracking a valid room id should add it to the list
3470 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3471 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3472 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3473
3474 // And the existing list shouldn't be changed
3475 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3476 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3477
3478 // Tracking the same room again shouldn't change the list
3479 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3480 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3481 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3482
3483 // Tracking a second room should add it to the front of the list
3484 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3485 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3486 assert_eq!(
3487 account.get_recently_visited_rooms().await.unwrap(),
3488 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3489 );
3490
3491 // Tracking the first room yet again should move it to the front of the list
3492 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3493 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3494 assert_eq!(
3495 account.get_recently_visited_rooms().await.unwrap(),
3496 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3497 );
3498
3499 // Tracking should be capped at 20
3500 for n in 0..20 {
3501 account
3502 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3503 .await
3504 .unwrap();
3505 }
3506
3507 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3508
3509 // And the initial rooms should've been pushed out
3510 let rooms = account.get_recently_visited_rooms().await.unwrap();
3511 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3512 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3513
3514 // And the last tracked room should be the first
3515 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3516 }
3517
3518 #[async_test]
3519 async fn test_client_no_cycle_with_event_cache() {
3520 let client = MockClientBuilder::new(None).build().await;
3521
3522 // Wait for the init tasks to die.
3523 sleep(Duration::from_secs(1)).await;
3524
3525 let weak_client = WeakClient::from_client(&client);
3526 assert_eq!(weak_client.strong_count(), 1);
3527
3528 {
3529 let room_id = room_id!("!room:example.org");
3530
3531 // Have the client know the room.
3532 let response = SyncResponseBuilder::default()
3533 .add_joined_room(JoinedRoomBuilder::new(room_id))
3534 .build_sync_response();
3535 client.inner.base_client.receive_sync_response(response).await.unwrap();
3536
3537 client.event_cache().subscribe().unwrap();
3538
3539 let (_room_event_cache, _drop_handles) =
3540 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3541 }
3542
3543 drop(client);
3544
3545 // Give a bit of time for background tasks to die.
3546 sleep(Duration::from_secs(1)).await;
3547
3548 // The weak client must be the last reference to the client now.
3549 assert_eq!(weak_client.strong_count(), 0);
3550 let client = weak_client.get();
3551 assert!(
3552 client.is_none(),
3553 "too many strong references to the client: {}",
3554 Arc::strong_count(&client.unwrap().inner)
3555 );
3556 }
3557
3558 #[async_test]
3559 async fn test_server_info_caching() {
3560 let server = MatrixMockServer::new().await;
3561 let server_url = server.uri();
3562 let domain = server_url.strip_prefix("http://").unwrap();
3563 let server_name = <&ServerName>::try_from(domain).unwrap();
3564 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3565
3566 let well_known_mock = server
3567 .mock_well_known()
3568 .ok()
3569 .named("well known mock")
3570 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3571 .mount_as_scoped()
3572 .await;
3573
3574 let versions_mock = server
3575 .mock_versions()
3576 .ok_with_unstable_features()
3577 .named("first versions mock")
3578 .expect(1)
3579 .mount_as_scoped()
3580 .await;
3581
3582 let memory_store = Arc::new(MemoryStore::new());
3583 let client = Client::builder()
3584 .insecure_server_name_no_tls(server_name)
3585 .store_config(
3586 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3587 .state_store(memory_store.clone()),
3588 )
3589 .build()
3590 .await
3591 .unwrap();
3592
3593 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3594
3595 // These subsequent calls hit the in-memory cache.
3596 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3597 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3598
3599 drop(client);
3600
3601 let client = server
3602 .client_builder()
3603 .no_server_versions()
3604 .on_builder(|builder| {
3605 builder.store_config(
3606 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3607 .state_store(memory_store.clone()),
3608 )
3609 })
3610 .build()
3611 .await;
3612
3613 // These calls to the new client hit the on-disk cache.
3614 assert!(
3615 client
3616 .unstable_features()
3617 .await
3618 .unwrap()
3619 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3620 );
3621 let supported = client.supported_versions().await.unwrap();
3622 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3623 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3624
3625 // Then this call hits the in-memory cache.
3626 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3627
3628 drop(versions_mock);
3629 drop(well_known_mock);
3630
3631 // Now, reset the cache, and observe the endpoints being called again once.
3632 client.reset_server_info().await.unwrap();
3633
3634 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3635
3636 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3637
3638 // Hits network again.
3639 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3640 // Hits in-memory cache again.
3641 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3642 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3643 }
3644
3645 #[async_test]
3646 async fn test_server_info_without_a_well_known() {
3647 let server = MatrixMockServer::new().await;
3648 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3649
3650 let versions_mock = server
3651 .mock_versions()
3652 .ok_with_unstable_features()
3653 .named("first versions mock")
3654 .expect(1)
3655 .mount_as_scoped()
3656 .await;
3657
3658 let memory_store = Arc::new(MemoryStore::new());
3659 let client = server
3660 .client_builder()
3661 .no_server_versions()
3662 .on_builder(|builder| {
3663 builder.store_config(
3664 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3665 .state_store(memory_store.clone()),
3666 )
3667 })
3668 .build()
3669 .await;
3670
3671 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3672
3673 // These subsequent calls hit the in-memory cache.
3674 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3675 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3676
3677 drop(client);
3678
3679 let client = server
3680 .client_builder()
3681 .no_server_versions()
3682 .on_builder(|builder| {
3683 builder.store_config(
3684 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3685 .state_store(memory_store.clone()),
3686 )
3687 })
3688 .build()
3689 .await;
3690
3691 // This call to the new client hits the on-disk cache.
3692 assert!(
3693 client
3694 .unstable_features()
3695 .await
3696 .unwrap()
3697 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3698 );
3699
3700 // Then this call hits the in-memory cache.
3701 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3702
3703 drop(versions_mock);
3704
3705 // Now, reset the cache, and observe the endpoints being called again once.
3706 client.reset_server_info().await.unwrap();
3707
3708 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3709
3710 // Hits network again.
3711 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3712 // Hits in-memory cache again.
3713 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3714 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3715 }
3716
3717 #[async_test]
3718 async fn test_no_network_doesnt_cause_infinite_retries() {
3719 // We want infinite retries for transient errors.
3720 let client = MockClientBuilder::new(None)
3721 .on_builder(|builder| builder.request_config(RequestConfig::new()))
3722 .build()
3723 .await;
3724
3725 // We don't define a mock server on purpose here, so that the error is really a
3726 // network error.
3727 client.whoami().await.unwrap_err();
3728 }
3729
3730 #[async_test]
3731 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3732 let server = MatrixMockServer::new().await;
3733 let client = server.client_builder().build().await;
3734
3735 let room_id = room_id!("!room:example.org");
3736
3737 server
3738 .mock_sync()
3739 .ok_and_run(&client, |builder| {
3740 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3741 })
3742 .await;
3743
3744 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3745 assert_eq!(room.room_id(), room_id);
3746 }
3747
3748 #[async_test]
3749 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3750 let server = MatrixMockServer::new().await;
3751 let client = server.client_builder().build().await;
3752
3753 let room_id = room_id!("!room:example.org");
3754
3755 let client = Arc::new(client);
3756
3757 // Perform the /sync request with a delay so it starts after the
3758 // `await_room_remote_echo` call has happened
3759 spawn({
3760 let client = client.clone();
3761 async move {
3762 sleep(Duration::from_millis(100)).await;
3763
3764 server
3765 .mock_sync()
3766 .ok_and_run(&client, |builder| {
3767 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3768 })
3769 .await;
3770 }
3771 });
3772
3773 let room =
3774 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3775 assert_eq!(room.room_id(), room_id);
3776 }
3777
3778 #[async_test]
3779 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3780 let client = MockClientBuilder::new(None).build().await;
3781
3782 let room_id = room_id!("!room:example.org");
3783 // Room is not present so the client won't be able to find it. The call will
3784 // timeout.
3785 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3786 }
3787
3788 #[async_test]
3789 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3790 let server = MatrixMockServer::new().await;
3791 let client = server.client_builder().build().await;
3792
3793 server.mock_create_room().ok().mount().await;
3794
3795 // Create a room in the internal store
3796 let room = client
3797 .create_room(assign!(CreateRoomRequest::new(), {
3798 invite: vec![],
3799 is_direct: false,
3800 }))
3801 .await
3802 .unwrap();
3803
3804 // Room is locally present, but not synced, the call will timeout
3805 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3806 .await
3807 .unwrap_err();
3808 }
3809
3810 #[async_test]
3811 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3812 let server = MatrixMockServer::new().await;
3813 let client = server.client_builder().build().await;
3814
3815 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3816
3817 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3818 assert_matches!(ret, Ok(true));
3819 }
3820
3821 #[async_test]
3822 async fn test_is_room_alias_available_if_alias_is_resolved() {
3823 let server = MatrixMockServer::new().await;
3824 let client = server.client_builder().build().await;
3825
3826 server
3827 .mock_room_directory_resolve_alias()
3828 .ok("!some_room_id:matrix.org", Vec::new())
3829 .expect(1)
3830 .mount()
3831 .await;
3832
3833 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3834 assert_matches!(ret, Ok(false));
3835 }
3836
3837 #[async_test]
3838 async fn test_is_room_alias_available_if_error_found() {
3839 let server = MatrixMockServer::new().await;
3840 let client = server.client_builder().build().await;
3841
3842 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3843
3844 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3845 assert_matches!(ret, Err(_));
3846 }
3847
3848 #[async_test]
3849 async fn test_create_room_alias() {
3850 let server = MatrixMockServer::new().await;
3851 let client = server.client_builder().build().await;
3852
3853 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3854
3855 let ret = client
3856 .create_room_alias(
3857 room_alias_id!("#some_alias:matrix.org"),
3858 room_id!("!some_room:matrix.org"),
3859 )
3860 .await;
3861 assert_matches!(ret, Ok(()));
3862 }
3863
3864 #[async_test]
3865 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3866 let server = MatrixMockServer::new().await;
3867 let client = server.client_builder().build().await;
3868
3869 let room_id = room_id!("!a-room:matrix.org");
3870
3871 // Make sure the summary endpoint is called once
3872 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3873
3874 // We create a locally cached invited room
3875 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3876
3877 // And we get a preview, the server endpoint was reached
3878 let preview = client
3879 .get_room_preview(room_id.into(), Vec::new())
3880 .await
3881 .expect("Room preview should be retrieved");
3882
3883 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3884 }
3885
3886 #[async_test]
3887 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3888 let server = MatrixMockServer::new().await;
3889 let client = server.client_builder().build().await;
3890
3891 let room_id = room_id!("!a-room:matrix.org");
3892
3893 // Make sure the summary endpoint is called once
3894 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3895
3896 // We create a locally cached left room
3897 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3898
3899 // And we get a preview, the server endpoint was reached
3900 let preview = client
3901 .get_room_preview(room_id.into(), Vec::new())
3902 .await
3903 .expect("Room preview should be retrieved");
3904
3905 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3906 }
3907
3908 #[async_test]
3909 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3910 let server = MatrixMockServer::new().await;
3911 let client = server.client_builder().build().await;
3912
3913 let room_id = room_id!("!a-room:matrix.org");
3914
3915 // Make sure the summary endpoint is called once
3916 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3917
3918 // We create a locally cached knocked room
3919 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3920
3921 // And we get a preview, the server endpoint was reached
3922 let preview = client
3923 .get_room_preview(room_id.into(), Vec::new())
3924 .await
3925 .expect("Room preview should be retrieved");
3926
3927 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3928 }
3929
3930 #[async_test]
3931 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3932 let server = MatrixMockServer::new().await;
3933 let client = server.client_builder().build().await;
3934
3935 let room_id = room_id!("!a-room:matrix.org");
3936
3937 // Make sure the summary endpoint is not called
3938 server.mock_room_summary().ok(room_id).never().mount().await;
3939
3940 // We create a locally cached joined room
3941 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3942
3943 // And we get a preview, no server endpoint was reached
3944 let preview = client
3945 .get_room_preview(room_id.into(), Vec::new())
3946 .await
3947 .expect("Room preview should be retrieved");
3948
3949 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3950 }
3951
3952 #[async_test]
3953 async fn test_media_preview_config() {
3954 let server = MatrixMockServer::new().await;
3955 let client = server.client_builder().build().await;
3956
3957 server
3958 .mock_sync()
3959 .ok_and_run(&client, |builder| {
3960 builder.add_custom_global_account_data(json!({
3961 "content": {
3962 "media_previews": "private",
3963 "invite_avatars": "off"
3964 },
3965 "type": "m.media_preview_config"
3966 }));
3967 })
3968 .await;
3969
3970 let (initial_value, stream) =
3971 client.account().observe_media_preview_config().await.unwrap();
3972
3973 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3974 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3975 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3976 pin_mut!(stream);
3977 assert_pending!(stream);
3978
3979 server
3980 .mock_sync()
3981 .ok_and_run(&client, |builder| {
3982 builder.add_custom_global_account_data(json!({
3983 "content": {
3984 "media_previews": "off",
3985 "invite_avatars": "on"
3986 },
3987 "type": "m.media_preview_config"
3988 }));
3989 })
3990 .await;
3991
3992 assert_next_matches!(
3993 stream,
3994 MediaPreviewConfigEventContent {
3995 media_previews: Some(MediaPreviews::Off),
3996 invite_avatars: Some(InviteAvatars::On),
3997 ..
3998 }
3999 );
4000 assert_pending!(stream);
4001 }
4002
4003 #[async_test]
4004 async fn test_unstable_media_preview_config() {
4005 let server = MatrixMockServer::new().await;
4006 let client = server.client_builder().build().await;
4007
4008 server
4009 .mock_sync()
4010 .ok_and_run(&client, |builder| {
4011 builder.add_custom_global_account_data(json!({
4012 "content": {
4013 "media_previews": "private",
4014 "invite_avatars": "off"
4015 },
4016 "type": "io.element.msc4278.media_preview_config"
4017 }));
4018 })
4019 .await;
4020
4021 let (initial_value, stream) =
4022 client.account().observe_media_preview_config().await.unwrap();
4023
4024 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4025 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4026 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4027 pin_mut!(stream);
4028 assert_pending!(stream);
4029
4030 server
4031 .mock_sync()
4032 .ok_and_run(&client, |builder| {
4033 builder.add_custom_global_account_data(json!({
4034 "content": {
4035 "media_previews": "off",
4036 "invite_avatars": "on"
4037 },
4038 "type": "io.element.msc4278.media_preview_config"
4039 }));
4040 })
4041 .await;
4042
4043 assert_next_matches!(
4044 stream,
4045 MediaPreviewConfigEventContent {
4046 media_previews: Some(MediaPreviews::Off),
4047 invite_avatars: Some(InviteAvatars::On),
4048 ..
4049 }
4050 );
4051 assert_pending!(stream);
4052 }
4053
4054 #[async_test]
4055 async fn test_media_preview_config_not_found() {
4056 let server = MatrixMockServer::new().await;
4057 let client = server.client_builder().build().await;
4058
4059 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4060
4061 assert!(initial_value.is_none());
4062 }
4063
4064 #[async_test]
4065 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4066 // The default Matrix version we use is 1.11 or higher, so authenticated media
4067 // is supported.
4068 let server = MatrixMockServer::new().await;
4069 let client = server.client_builder().build().await;
4070
4071 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4072
4073 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4074 client.load_or_fetch_max_upload_size().await.unwrap();
4075
4076 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4077 }
4078
4079 #[async_test]
4080 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4081 // The server must advertise support for the stable feature for authenticated
4082 // media support, so we mock the `GET /versions` response.
4083 let server = MatrixMockServer::new().await;
4084 let client = server.client_builder().no_server_versions().build().await;
4085
4086 server
4087 .mock_versions()
4088 .ok_custom(
4089 &["v1.7", "v1.8", "v1.9", "v1.10"],
4090 &[("org.matrix.msc3916.stable", true)].into(),
4091 )
4092 .named("versions")
4093 .expect(1)
4094 .mount()
4095 .await;
4096
4097 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4098
4099 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4100 client.load_or_fetch_max_upload_size().await.unwrap();
4101
4102 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4103 }
4104
4105 #[async_test]
4106 async fn test_load_or_fetch_max_upload_size_no_auth() {
4107 // The server must not support Matrix 1.11 or higher for unauthenticated
4108 // media requests, so we mock the `GET /versions` response.
4109 let server = MatrixMockServer::new().await;
4110 let client = server.client_builder().no_server_versions().build().await;
4111
4112 server
4113 .mock_versions()
4114 .ok_custom(&["v1.1"], &Default::default())
4115 .named("versions")
4116 .expect(1)
4117 .mount()
4118 .await;
4119
4120 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4121
4122 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4123 client.load_or_fetch_max_upload_size().await.unwrap();
4124
4125 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4126 }
4127
4128 #[async_test]
4129 async fn test_uploading_a_too_large_media_file() {
4130 let server = MatrixMockServer::new().await;
4131 let client = server.client_builder().build().await;
4132
4133 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4134 client.load_or_fetch_max_upload_size().await.unwrap();
4135 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4136
4137 let data = vec![1, 2];
4138 let upload_request =
4139 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4140 let request = SendRequest {
4141 client: client.clone(),
4142 request: upload_request,
4143 config: None,
4144 send_progress: SharedObservable::new(TransmissionProgress::default()),
4145 };
4146 let media_request = SendMediaUploadRequest::new(request);
4147
4148 let error = media_request.await.err();
4149 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4150 assert_eq!(max, uint!(1));
4151 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4152 }
4153
4154 #[async_test]
4155 async fn test_dont_ignore_timeout_on_first_sync() {
4156 let server = MatrixMockServer::new().await;
4157 let client = server.client_builder().build().await;
4158
4159 server
4160 .mock_sync()
4161 .timeout(Some(Duration::from_secs(30)))
4162 .ok(|_| {})
4163 .mock_once()
4164 .named("sync_with_timeout")
4165 .mount()
4166 .await;
4167
4168 // Call the endpoint once to check the timeout.
4169 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4170
4171 timeout(Duration::from_secs(1), async {
4172 stream.next().await.unwrap().unwrap();
4173 })
4174 .await
4175 .unwrap();
4176 }
4177
4178 #[async_test]
4179 async fn test_ignore_timeout_on_first_sync() {
4180 let server = MatrixMockServer::new().await;
4181 let client = server.client_builder().build().await;
4182
4183 server
4184 .mock_sync()
4185 .timeout(None)
4186 .ok(|_| {})
4187 .mock_once()
4188 .named("sync_no_timeout")
4189 .mount()
4190 .await;
4191 server
4192 .mock_sync()
4193 .timeout(Some(Duration::from_secs(30)))
4194 .ok(|_| {})
4195 .mock_once()
4196 .named("sync_with_timeout")
4197 .mount()
4198 .await;
4199
4200 // Call each version of the endpoint once to check the timeouts.
4201 let mut stream = Box::pin(
4202 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4203 );
4204
4205 timeout(Duration::from_secs(1), async {
4206 stream.next().await.unwrap().unwrap();
4207 stream.next().await.unwrap().unwrap();
4208 })
4209 .await
4210 .unwrap();
4211 }
4212
4213 #[async_test]
4214 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4215 let server = MatrixMockServer::new().await;
4216 let client = server.client_builder().build().await;
4217 // This is the user ID that is inside MemberAdditional.
4218 // Note the confusing username, so we can share
4219 // GlobalAccountDataTestEvent::Direct with the invited test.
4220 let user_id = user_id!("@invited:localhost");
4221
4222 // When we receive a sync response saying "invited" is invited to a DM
4223 let f = EventFactory::new();
4224 let response = SyncResponseBuilder::default()
4225 .add_joined_room(
4226 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
4227 )
4228 .add_global_account_data(
4229 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4230 )
4231 .build_sync_response();
4232 client.base_client().receive_sync_response(response).await.unwrap();
4233
4234 // Then get_dm_room finds this room
4235 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4236 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4237 }
4238
4239 #[async_test]
4240 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4241 let server = MatrixMockServer::new().await;
4242 let client = server.client_builder().build().await;
4243 // This is the user ID that is inside MemberInvite
4244 let user_id = user_id!("@invited:localhost");
4245
4246 // When we receive a sync response saying "invited" is invited to a DM
4247 let f = EventFactory::new();
4248 let response = SyncResponseBuilder::default()
4249 .add_joined_room(
4250 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
4251 )
4252 .add_global_account_data(
4253 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4254 )
4255 .build_sync_response();
4256 client.base_client().receive_sync_response(response).await.unwrap();
4257
4258 // Then get_dm_room finds this room
4259 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4260 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4261 }
4262
4263 #[async_test]
4264 async fn test_get_dm_room_still_finds_left_room() {
4265 // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4266 // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4267
4268 let server = MatrixMockServer::new().await;
4269 let client = server.client_builder().build().await;
4270 // This is the user ID that is inside MemberAdditional.
4271 // Note the confusing username, so we can share
4272 // GlobalAccountDataTestEvent::Direct with the invited test.
4273 let user_id = user_id!("@invited:localhost");
4274
4275 // When we receive a sync response saying "invited" is invited to a DM
4276 let f = EventFactory::new();
4277 let response = SyncResponseBuilder::default()
4278 .add_joined_room(
4279 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
4280 )
4281 .add_global_account_data(
4282 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4283 )
4284 .build_sync_response();
4285 client.base_client().receive_sync_response(response).await.unwrap();
4286
4287 // Then get_dm_room finds this room
4288 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4289 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4290 }
4291
4292 #[async_test]
4293 async fn test_device_exists() {
4294 let server = MatrixMockServer::new().await;
4295 let client = server.client_builder().build().await;
4296
4297 server.mock_get_device().ok().expect(1).mount().await;
4298
4299 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4300 }
4301
4302 #[async_test]
4303 async fn test_device_exists_404() {
4304 let server = MatrixMockServer::new().await;
4305 let client = server.client_builder().build().await;
4306
4307 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4308 }
4309
4310 #[async_test]
4311 async fn test_device_exists_500() {
4312 let server = MatrixMockServer::new().await;
4313 let client = server.client_builder().build().await;
4314
4315 server.mock_get_device().error500().expect(1).mount().await;
4316
4317 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4318 }
4319}