matrix_sdk/room/mod.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level room API
16
17use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30 StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39 IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
44 StateChanges, StateStoreDataKey, StateStoreDataValue,
45 deserialized_responses::{
46 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47 },
48 media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49 serde_helpers::extract_relation,
50 store::{StateStoreExt, ThreadSubscriptionStatus},
51};
52#[cfg(feature = "e2e-encryption")]
53use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
54#[cfg(feature = "e2e-encryption")]
55use matrix_sdk_common::BoxFuture;
56use matrix_sdk_common::{
57 deserialized_responses::TimelineEvent,
58 executor::{JoinHandle, spawn},
59 timeout::timeout,
60};
61#[cfg(feature = "experimental-search")]
62use matrix_sdk_search::error::IndexError;
63#[cfg(feature = "experimental-search")]
64#[cfg(doc)]
65use matrix_sdk_search::index::RoomIndex;
66use mime::Mime;
67use reply::Reply;
68#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
69use ruma::events::AnySyncMessageLikeEvent;
70#[cfg(feature = "experimental-encrypted-state-events")]
71use ruma::events::AnySyncStateEvent;
72#[cfg(feature = "unstable-msc4274")]
73use ruma::events::room::message::GalleryItemType;
74#[cfg(feature = "e2e-encryption")]
75use ruma::events::{
76 AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
77};
78use ruma::{
79 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
80 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
81 api::client::{
82 config::{set_global_account_data, set_room_account_data},
83 context,
84 error::ErrorKind,
85 filter::LazyLoadOptions,
86 membership::{
87 Invite3pid, ban_user, forget_room, get_member_events,
88 invite_user::{
89 self,
90 v3::{InvitationRecipient, InviteUserId},
91 },
92 kick_user, leave_room, unban_user,
93 },
94 message::send_message_event,
95 read_marker::set_read_marker,
96 receipt::create_receipt,
97 redact::redact_event,
98 room::{get_room_event, report_content, report_room},
99 state::{get_state_event_for_key, send_state_event},
100 tag::{create_tag, delete_tag},
101 threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
102 typing::create_typing_event::{
103 self,
104 v3::{Typing, TypingInfo},
105 },
106 },
107 assign,
108 events::{
109 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
110 Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
111 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
112 RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
113 StaticStateEventContent, SyncStateEvent,
114 beacon::BeaconEventContent,
115 beacon_info::BeaconInfoEventContent,
116 direct::DirectEventContent,
117 marked_unread::MarkedUnreadEventContent,
118 receipt::{Receipt, ReceiptThread, ReceiptType},
119 relation::RelationType,
120 room::{
121 ImageInfo, MediaSource, ThumbnailInfo,
122 avatar::{self, RoomAvatarEventContent},
123 encryption::PossiblyRedactedRoomEncryptionEventContent,
124 history_visibility::HistoryVisibility,
125 member::{MembershipChange, RoomMemberEventContent, SyncRoomMemberEvent},
126 message::{
127 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
128 ImageMessageEventContent, MessageType, RoomMessageEventContent,
129 TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
130 UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
131 },
132 name::RoomNameEventContent,
133 pinned_events::RoomPinnedEventsEventContent,
134 power_levels::{
135 RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
136 },
137 server_acl::RoomServerAclEventContent,
138 topic::RoomTopicEventContent,
139 },
140 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
141 tag::{TagInfo, TagName},
142 typing::SyncTypingEvent,
143 },
144 int,
145 push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
146 serde::Raw,
147 time::Instant,
148 uint,
149};
150#[cfg(feature = "experimental-encrypted-state-events")]
151use ruma::{
152 events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
153 serde::JsonCastable,
154};
155use serde::de::DeserializeOwned;
156use thiserror::Error;
157use tokio::{join, sync::broadcast};
158use tracing::{debug, error, info, instrument, trace, warn};
159
160use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
161pub use self::{
162 member::{RoomMember, RoomMemberRole},
163 messages::{
164 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
165 Relations, RelationsOptions, ThreadRoots,
166 },
167};
168#[cfg(feature = "e2e-encryption")]
169use crate::encryption::backups::BackupState;
170#[cfg(doc)]
171use crate::event_cache::EventCache;
172#[cfg(feature = "experimental-encrypted-state-events")]
173use crate::room::futures::{SendRawStateEvent, SendStateEvent};
174use crate::{
175 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
176 attachment::{AttachmentConfig, AttachmentInfo},
177 client::WeakClient,
178 config::RequestConfig,
179 error::{BeaconError, WrongRoomState},
180 event_cache::{self, EventCacheDropHandles, RoomEventCache},
181 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
182 live_location_share::ObservableLiveLocation,
183 media::{MediaFormat, MediaRequestParameters},
184 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
185 room::{
186 knock_requests::{KnockRequest, KnockRequestMemberInfo},
187 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
188 privacy_settings::RoomPrivacySettings,
189 },
190 sync::RoomUpdate,
191 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
192};
193
194pub mod edit;
195pub mod futures;
196pub mod identity_status_changes;
197/// Contains code related to requests to join a room.
198pub mod knock_requests;
199mod member;
200mod messages;
201pub mod power_levels;
202pub mod reply;
203
204pub mod calls;
205
206/// Contains all the functionality for modifying the privacy settings in a room.
207pub mod privacy_settings;
208
209#[cfg(feature = "e2e-encryption")]
210pub(crate) mod shared_room_history;
211
212/// A struct containing methods that are common for Joined, Invited and Left
213/// Rooms
214#[derive(Debug, Clone)]
215pub struct Room {
216 inner: BaseRoom,
217 pub(crate) client: Client,
218}
219
220impl Deref for Room {
221 type Target = BaseRoom;
222
223 fn deref(&self) -> &Self::Target {
224 &self.inner
225 }
226}
227
228const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
229const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
230
231/// A thread subscription, according to the semantics of MSC4306.
232#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233pub struct ThreadSubscription {
234 /// Whether the subscription was made automatically by a client, not by
235 /// manual user choice.
236 pub automatic: bool,
237}
238
239/// Context allowing to compute the push actions for a given event.
240#[derive(Debug)]
241pub struct PushContext {
242 /// The Ruma context used to compute the push actions.
243 push_condition_room_ctx: PushConditionRoomCtx,
244
245 /// Push rules for this room, based on the push rules state event, or the
246 /// global server default as defined by [`Ruleset::server_default`].
247 push_rules: Ruleset,
248}
249
250impl PushContext {
251 /// Create a new [`PushContext`] from its inner components.
252 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
253 Self { push_condition_room_ctx, push_rules }
254 }
255
256 /// Compute the push rules for a given event.
257 pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
258 self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
259 }
260
261 /// Compute the push rules for a given event, with extra logging to help
262 /// debugging.
263 #[doc(hidden)]
264 #[instrument(skip_all)]
265 pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
266 let rules = self
267 .push_rules
268 .iter()
269 .filter_map(|r| {
270 if !r.enabled() {
271 return None;
272 }
273
274 let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
275
276 let conditions = match r {
277 AnyPushRuleRef::Override(r) => {
278 format!("{:?}", r.conditions)
279 }
280 AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
281 AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
282 AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
283 AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
284 _ => "<unknown push rule kind>".to_owned(),
285 };
286
287 Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
288 })
289 .collect::<Vec<_>>()
290 .join("\n");
291 trace!("rules:\n\n{rules}\n\n");
292
293 let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
294
295 if let Some(found) = found {
296 trace!("rule {} matched", found.rule_id());
297 found.actions().to_owned()
298 } else {
299 trace!("no match");
300 Vec::new()
301 }
302 }
303}
304
305macro_rules! make_media_type {
306 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
307 // If caption is set, use it as body, and filename as the file name; otherwise,
308 // body is the filename, and the filename is not set.
309 // https://github.com/matrix-org/matrix-spec-proposals/blob/main/proposals/2530-body-as-caption.md
310 let (body, formatted, filename) = match $caption {
311 Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
312 None => ($filename, None, None),
313 };
314
315 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
316
317 match $content_type.type_() {
318 mime::IMAGE => {
319 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
320 mimetype: Some($content_type.as_ref().to_owned()),
321 thumbnail_source,
322 thumbnail_info
323 });
324 let content = assign!(ImageMessageEventContent::new(body, $source), {
325 info: Some(Box::new(info)),
326 formatted,
327 filename
328 });
329 <$t>::Image(content)
330 }
331
332 mime::AUDIO => {
333 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
334 formatted,
335 filename
336 });
337
338 if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
339 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
340 let waveform = waveform_vec
341 .iter()
342 .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
343 .map(Into::into)
344 .collect();
345 content.audio =
346 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
347 }
348
349 if matches!($info, Some(AttachmentInfo::Voice(_))) {
350 content.voice = Some(UnstableVoiceContentBlock::new());
351 }
352
353 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
354 audio_info.mimetype = Some($content_type.as_ref().to_owned());
355 let content = content.info(Box::new(audio_info));
356
357 <$t>::Audio(content)
358 }
359
360 mime::VIDEO => {
361 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
362 mimetype: Some($content_type.as_ref().to_owned()),
363 thumbnail_source,
364 thumbnail_info
365 });
366 let content = assign!(VideoMessageEventContent::new(body, $source), {
367 info: Some(Box::new(info)),
368 formatted,
369 filename
370 });
371 <$t>::Video(content)
372 }
373
374 _ => {
375 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
376 mimetype: Some($content_type.as_ref().to_owned()),
377 thumbnail_source,
378 thumbnail_info
379 });
380 let content = assign!(FileMessageEventContent::new(body, $source), {
381 info: Some(Box::new(info)),
382 formatted,
383 filename,
384 });
385 <$t>::File(content)
386 }
387 }
388 }};
389}
390
391impl Room {
392 /// Create a new `Room`
393 ///
394 /// # Arguments
395 /// * `client` - The client used to make requests.
396 ///
397 /// * `room` - The underlying room.
398 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
399 Self { inner: room, client }
400 }
401
402 /// Leave this room.
403 /// If the room was in [`RoomState::Invited`] state, it'll also be forgotten
404 /// automatically.
405 ///
406 /// Only invited and joined rooms can be left.
407 #[doc(alias = "reject_invitation")]
408 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
409 async fn leave_impl(&self) -> (Result<()>, &Room) {
410 let state = self.state();
411 if state == RoomState::Left {
412 return (
413 Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
414 "Joined or Invited",
415 state,
416 )))),
417 self,
418 );
419 }
420
421 // If the room was in Invited state we should also forget it when declining the
422 // invite.
423 let should_forget = matches!(self.state(), RoomState::Invited);
424
425 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
426 let response = self.client.send(request).await;
427
428 // The server can return with an error that is acceptable to ignore. Let's find
429 // which one.
430 if let Err(error) = response {
431 #[allow(clippy::collapsible_match)]
432 let ignore_error = if let Some(error) = error.client_api_error_kind() {
433 match error {
434 // The user is trying to leave a room but doesn't have permissions to do so.
435 // Let's consider the user has left the room.
436 ErrorKind::Forbidden => true,
437 _ => false,
438 }
439 } else {
440 false
441 };
442
443 error!(?error, ignore_error, should_forget, "Failed to leave the room");
444
445 if !ignore_error {
446 return (Err(error.into()), self);
447 }
448 }
449
450 if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
451 return (Err(e.into()), self);
452 }
453
454 if should_forget {
455 trace!("Trying to forget the room");
456
457 if let Err(error) = self.forget().await {
458 error!(?error, "Failed to forget the room");
459 }
460 }
461
462 (Ok(()), self)
463 }
464
465 /// Leave this room and all predecessors.
466 /// If any room was in [`RoomState::Invited`] state, it'll also be forgotten
467 /// automatically.
468 ///
469 /// Only invited and joined rooms can be left.
470 /// Will return an error if the current room fails to leave but
471 /// will only warn if a predecessor fails to leave.
472 pub async fn leave(&self) -> Result<()> {
473 let mut rooms: Vec<Room> = vec![self.clone()];
474 let mut current_room = self;
475
476 while let Some(predecessor) = current_room.predecessor_room() {
477 let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
478
479 if let Some(predecessor_room) = maybe_predecessor_room {
480 rooms.push(predecessor_room.clone());
481 current_room = rooms.last().expect("Room just pushed so can't be empty");
482 } else {
483 warn!("Cannot find predecessor room");
484 break;
485 }
486 }
487
488 let batch_size = 5;
489
490 let rooms_futures: Vec<_> = rooms
491 .iter()
492 .filter_map(|room| match room.state() {
493 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
494 Some(room.leave_impl())
495 }
496 RoomState::Banned | RoomState::Left => None,
497 })
498 .collect();
499
500 let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
501
502 let mut maybe_this_room_failed_with: Option<Error> = None;
503
504 while let Some(result) = futures_stream.next().await {
505 if let (Err(e), room) = result {
506 if room.room_id() == self.room_id() {
507 maybe_this_room_failed_with = Some(e);
508 } else {
509 warn!("Failure while attempting to leave predecessor room: {e:?}");
510 }
511 }
512 }
513
514 maybe_this_room_failed_with.map_or(Ok(()), Err)
515 }
516
517 /// Join this room.
518 ///
519 /// Only invited and left rooms can be joined via this method.
520 #[doc(alias = "accept_invitation")]
521 pub async fn join(&self) -> Result<()> {
522 let prev_room_state = self.inner.state();
523
524 if prev_room_state == RoomState::Joined {
525 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
526 "Invited or Left",
527 prev_room_state,
528 ))));
529 }
530
531 self.client.join_room_by_id(self.room_id()).await?;
532
533 Ok(())
534 }
535
536 /// Get the inner client saved in this room instance.
537 ///
538 /// Returns the client this room is part of.
539 pub fn client(&self) -> Client {
540 self.client.clone()
541 }
542
543 /// Get the sync state of this room, i.e. whether it was fully synced with
544 /// the server.
545 pub fn is_synced(&self) -> bool {
546 self.inner.is_state_fully_synced()
547 }
548
549 /// Gets the avatar of this room, if set.
550 ///
551 /// Returns the avatar.
552 /// If a thumbnail is requested no guarantee on the size of the image is
553 /// given.
554 ///
555 /// # Arguments
556 ///
557 /// * `format` - The desired format of the avatar.
558 ///
559 /// # Examples
560 ///
561 /// ```no_run
562 /// # use matrix_sdk::Client;
563 /// # use matrix_sdk::ruma::room_id;
564 /// # use matrix_sdk::media::MediaFormat;
565 /// # use url::Url;
566 /// # let homeserver = Url::parse("http://example.com").unwrap();
567 /// # async {
568 /// # let user = "example";
569 /// let client = Client::new(homeserver).await.unwrap();
570 /// client.matrix_auth().login_username(user, "password").send().await.unwrap();
571 /// let room_id = room_id!("!roomid:example.com");
572 /// let room = client.get_room(&room_id).unwrap();
573 /// if let Some(avatar) = room.avatar(MediaFormat::File).await.unwrap() {
574 /// std::fs::write("avatar.png", avatar);
575 /// }
576 /// # };
577 /// ```
578 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
579 let Some(url) = self.avatar_url() else { return Ok(None) };
580 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
581 Ok(Some(self.client.media().get_media_content(&request, true).await?))
582 }
583
584 /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
585 /// returns a `Messages` struct that contains a chunk of room and state
586 /// events (`RoomEvent` and `AnyStateEvent`).
587 ///
588 /// With the encryption feature, messages are decrypted if possible. If
589 /// decryption fails for an individual message, that message is returned
590 /// undecrypted.
591 ///
592 /// # Examples
593 ///
594 /// ```no_run
595 /// use matrix_sdk::{Client, room::MessagesOptions};
596 /// # use matrix_sdk::ruma::{
597 /// # api::client::filter::RoomEventFilter,
598 /// # room_id,
599 /// # };
600 /// # use url::Url;
601 ///
602 /// # let homeserver = Url::parse("http://example.com").unwrap();
603 /// # async {
604 /// let options =
605 /// MessagesOptions::backward().from("t47429-4392820_219380_26003_2265");
606 ///
607 /// let mut client = Client::new(homeserver).await.unwrap();
608 /// let room = client.get_room(room_id!("!roomid:example.com")).unwrap();
609 /// assert!(room.messages(options).await.is_ok());
610 /// # };
611 /// ```
612 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
613 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
614 let room_id = self.inner.room_id();
615 let request = options.into_request(room_id);
616 let http_response = self.client.send(request).await?;
617
618 let push_ctx = self.push_context().await?;
619 let chunk = join_all(
620 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
621 )
622 .await;
623
624 // Save the loaded events into the event cache, if it's set up.
625 if let Ok((cache, _handles)) = self.event_cache().await {
626 cache.save_events(chunk.clone()).await;
627 }
628
629 Ok(Messages {
630 start: http_response.start,
631 end: http_response.end,
632 chunk,
633 state: http_response.state,
634 })
635 }
636
637 /// Register a handler for events of a specific type, within this room.
638 ///
639 /// This method works the same way as [`Client::add_event_handler`], except
640 /// that the handler will only be called for events within this room. See
641 /// that method for more details on event handler functions.
642 ///
643 /// `room.add_event_handler(hdl)` is equivalent to
644 /// `client.add_room_event_handler(room_id, hdl)`. Use whichever one is more
645 /// convenient in your use case.
646 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
647 where
648 Ev: SyncEvent + DeserializeOwned + Send + 'static,
649 H: EventHandler<Ev, Ctx>,
650 {
651 self.client.add_room_event_handler(self.room_id(), handler)
652 }
653
654 /// Subscribe to all updates for this room.
655 ///
656 /// The returned receiver will receive a new message for each sync response
657 /// that contains updates for this room.
658 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
659 self.client.subscribe_to_room_updates(self.room_id())
660 }
661
662 /// Subscribe to typing notifications for this room.
663 ///
664 /// The returned receiver will receive a new vector of user IDs for each
665 /// sync response that contains 'm.typing' event. The current user ID will
666 /// be filtered out.
667 pub fn subscribe_to_typing_notifications(
668 &self,
669 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
670 let (sender, receiver) = broadcast::channel(16);
671 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
672 let own_user_id = self.own_user_id().to_owned();
673 move |event: SyncTypingEvent| async move {
674 // Ignore typing notifications from own user.
675 let typing_user_ids = event
676 .content
677 .user_ids
678 .into_iter()
679 .filter(|user_id| *user_id != own_user_id)
680 .collect();
681 // Ignore the result. It can only fail if there are no listeners.
682 let _ = sender.send(typing_user_ids);
683 }
684 });
685 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
686 (drop_guard, receiver)
687 }
688
689 /// Subscribe to updates about users who are in "pin violation" i.e. their
690 /// identity has changed and the user has not yet acknowledged this.
691 ///
692 /// The returned receiver will receive a new vector of
693 /// [`IdentityStatusChange`] each time a /keys/query response shows a
694 /// changed identity for a member of this room, or a sync shows a change
695 /// to the membership of an affected user. (Changes to the current user are
696 /// not directly included, but some changes to the current user's identity
697 /// can trigger changes to how we see other users' identities, which
698 /// will be included.)
699 ///
700 /// The first item in the stream provides the current state of the room:
701 /// each member of the room who is not in "pinned" or "verified" state will
702 /// be included (except the current user).
703 ///
704 /// If the `changed_to` property of an [`IdentityStatusChange`] is set to
705 /// `PinViolation` then a warning should be displayed to the user. If it is
706 /// set to `Pinned` then no warning should be displayed.
707 ///
708 /// Note that if a user who is in pin violation leaves the room, a `Pinned`
709 /// update is sent, to indicate that the warning should be removed, even
710 /// though the user's identity is not necessarily pinned.
711 #[cfg(feature = "e2e-encryption")]
712 pub async fn subscribe_to_identity_status_changes(
713 &self,
714 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
715 IdentityStatusChanges::create_stream(self.clone()).await
716 }
717
718 /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
719 /// decrypted if needs be.
720 ///
721 /// Only logs from the crypto crate will indicate a failure to decrypt.
722 #[cfg(not(feature = "experimental-encrypted-state-events"))]
723 #[allow(clippy::unused_async)] // Used only in e2e-encryption.
724 async fn try_decrypt_event(
725 &self,
726 event: Raw<AnyTimelineEvent>,
727 push_ctx: Option<&PushContext>,
728 ) -> TimelineEvent {
729 #[cfg(feature = "e2e-encryption")]
730 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
731 SyncMessageLikeEvent::Original(_),
732 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
733 && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
734 {
735 return event;
736 }
737
738 let mut event = TimelineEvent::from_plaintext(event.cast());
739 if let Some(push_ctx) = push_ctx {
740 event.set_push_actions(push_ctx.for_event(event.raw()).await);
741 }
742
743 event
744 }
745
746 /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
747 /// decrypted if needs be.
748 ///
749 /// Only logs from the crypto crate will indicate a failure to decrypt.
750 #[cfg(feature = "experimental-encrypted-state-events")]
751 #[allow(clippy::unused_async)] // Used only in e2e-encryption.
752 async fn try_decrypt_event(
753 &self,
754 event: Raw<AnyTimelineEvent>,
755 push_ctx: Option<&PushContext>,
756 ) -> TimelineEvent {
757 // If we have either an encrypted message-like or state event, try to decrypt.
758 match event.deserialize_as::<AnySyncTimelineEvent>() {
759 Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
760 SyncMessageLikeEvent::Original(_),
761 ))) => {
762 if let Ok(event) = self
763 .decrypt_event(
764 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
765 push_ctx,
766 )
767 .await
768 {
769 return event;
770 }
771 }
772 Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
773 SyncStateEvent::Original(_),
774 ))) => {
775 if let Ok(event) = self
776 .decrypt_event(
777 event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
778 push_ctx,
779 )
780 .await
781 {
782 return event;
783 }
784 }
785 _ => {}
786 }
787
788 let mut event = TimelineEvent::from_plaintext(event.cast());
789 if let Some(push_ctx) = push_ctx {
790 event.set_push_actions(push_ctx.for_event(event.raw()).await);
791 }
792
793 event
794 }
795
796 /// Fetch the event with the given `EventId` in this room.
797 ///
798 /// It uses the given [`RequestConfig`] if provided, or the client's default
799 /// one otherwise.
800 pub async fn event(
801 &self,
802 event_id: &EventId,
803 request_config: Option<RequestConfig>,
804 ) -> Result<TimelineEvent> {
805 let request =
806 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
807
808 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
809 let push_ctx = self.push_context().await?;
810 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
811
812 // Save the event into the event cache, if it's set up.
813 if let Ok((cache, _handles)) = self.event_cache().await {
814 cache.save_events([event.clone()]).await;
815 }
816
817 Ok(event)
818 }
819
820 /// Try to load the event from the [`EventCache`][crate::event_cache], if
821 /// it's enabled, or fetch it from the homeserver.
822 ///
823 /// When running the request against the homeserver, it uses the given
824 /// [`RequestConfig`] if provided, or the client's default one
825 /// otherwise.
826 pub async fn load_or_fetch_event(
827 &self,
828 event_id: &EventId,
829 request_config: Option<RequestConfig>,
830 ) -> Result<TimelineEvent> {
831 match self.event_cache().await {
832 Ok((event_cache, _drop_handles)) => {
833 if let Some(event) = event_cache.find_event(event_id).await? {
834 return Ok(event);
835 }
836 // Fallthrough: try with a request.
837 }
838 Err(err) => {
839 debug!("error when getting the event cache: {err}");
840 }
841 }
842 self.event(event_id, request_config).await
843 }
844
845 /// Try to load the event and its relations from the
846 /// [`EventCache`][crate::event_cache], if it's enabled, or fetch it
847 /// from the homeserver.
848 ///
849 /// You can control which types of related events are retrieved using
850 /// `filter`. A `None` value will retrieve any type of related event.
851 ///
852 /// If the event is found in the event cache, but we can't find any
853 /// relations for it there, then we will still attempt to fetch the
854 /// relations from the homeserver.
855 ///
856 /// When running any request against the homeserver, it uses the given
857 /// [`RequestConfig`] if provided, or the client's default one
858 /// otherwise.
859 ///
860 /// Returns a tuple formed of the event and a vector of its relations (that
861 /// can be empty).
862 pub async fn load_or_fetch_event_with_relations(
863 &self,
864 event_id: &EventId,
865 filter: Option<Vec<RelationType>>,
866 request_config: Option<RequestConfig>,
867 ) -> Result<(TimelineEvent, Vec<TimelineEvent>)> {
868 let fetch_relations = async || {
869 // If there's only a single filter, we can use a more efficient request,
870 // specialized on the filter type.
871 //
872 // Otherwise, we need to get all the relations:
873 // - either because no filters implies we fetch all relations,
874 // - or because there are multiple filters and we must filter out manually.
875 let include_relations = if let Some(filter) = &filter
876 && filter.len() == 1
877 {
878 IncludeRelations::RelationsOfType(filter[0].clone())
879 } else {
880 IncludeRelations::AllRelations
881 };
882
883 let mut opts = RelationsOptions {
884 include_relations,
885 recurse: true,
886 limit: Some(uint!(256)),
887 ..Default::default()
888 };
889
890 let mut events = Vec::new();
891 loop {
892 match self.relations(event_id.to_owned(), opts.clone()).await {
893 Ok(relations) => {
894 if let Some(filter) = filter.as_ref() {
895 // Manually filter out the relation types we're interested in.
896 events.extend(relations.chunk.into_iter().filter_map(|ev| {
897 let (rel_type, _) = extract_relation(ev.raw())?;
898 filter
899 .iter()
900 .any(|ruma_filter| ruma_filter == &rel_type)
901 .then_some(ev)
902 }));
903 } else {
904 // No filter: include all events from the response.
905 events.extend(relations.chunk);
906 }
907
908 if let Some(next_from) = relations.next_batch_token {
909 opts.from = Some(next_from);
910 } else {
911 break events;
912 }
913 }
914
915 Err(err) => {
916 warn!(%event_id, "error when loading relations of pinned event from server: {err}");
917 break events;
918 }
919 }
920 }
921 };
922
923 // First, try to load the event *and* its relations from the event cache, all at
924 // once.
925 let event_cache = match self.event_cache().await {
926 Ok((event_cache, drop_handles)) => {
927 if let Some((event, mut relations)) =
928 event_cache.find_event_with_relations(event_id, filter.clone()).await?
929 {
930 if relations.is_empty() {
931 // The event cache doesn't have any relations for this event, try to fetch
932 // them from the server instead.
933 relations = fetch_relations().await;
934 }
935
936 return Ok((event, relations));
937 }
938
939 // Otherwise, get the event from the server.
940 Some((event_cache, drop_handles))
941 }
942
943 Err(err) => {
944 debug!("error when getting the event cache: {err}");
945 // Fallthrough: try with a request.
946 None
947 }
948 };
949
950 // Fetch the event from the server. A failure here is fatal, as we must return
951 // the target event.
952 let event = self.event(event_id, request_config).await?;
953
954 // Try to get the relations from the event cache (if we have one).
955 if let Some((event_cache, _drop_handles)) = event_cache
956 && let Some(relations) =
957 event_cache.find_event_relations(event_id, filter.clone()).await.ok()
958 && !relations.is_empty()
959 {
960 return Ok((event, relations));
961 }
962
963 // We couldn't find the relations in the event cache; fetch them from the
964 // server.
965 Ok((event, fetch_relations().await))
966 }
967
968 /// Fetch the event with the given `EventId` in this room, using the
969 /// `/context` endpoint to get more information.
970 pub async fn event_with_context(
971 &self,
972 event_id: &EventId,
973 lazy_load_members: bool,
974 context_size: UInt,
975 request_config: Option<RequestConfig>,
976 ) -> Result<EventWithContextResponse> {
977 let mut request =
978 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
979
980 request.limit = context_size;
981
982 if lazy_load_members {
983 request.filter.lazy_load_options =
984 LazyLoadOptions::Enabled { include_redundant_members: false };
985 }
986
987 let response = self.client.send(request).with_request_config(request_config).await?;
988
989 let push_ctx = self.push_context().await?;
990 let push_ctx = push_ctx.as_ref();
991 let target_event = if let Some(event) = response.event {
992 Some(self.try_decrypt_event(event, push_ctx).await)
993 } else {
994 None
995 };
996
997 // Note: the joined future will fail if any future failed, but
998 // [`Self::try_decrypt_event`] doesn't hard-fail when there's a
999 // decryption error, so we should prevent against most bad cases here.
1000 let (events_before, events_after) = join!(
1001 join_all(
1002 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1003 ),
1004 join_all(
1005 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1006 ),
1007 );
1008
1009 // Save the loaded events into the event cache, if it's set up.
1010 if let Ok((cache, _handles)) = self.event_cache().await {
1011 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
1012 if let Some(event) = &target_event {
1013 events_to_save.push(event.clone());
1014 }
1015
1016 for event in &events_before {
1017 events_to_save.push(event.clone());
1018 }
1019
1020 for event in &events_after {
1021 events_to_save.push(event.clone());
1022 }
1023
1024 cache.save_events(events_to_save).await;
1025 }
1026
1027 Ok(EventWithContextResponse {
1028 event: target_event,
1029 events_before,
1030 events_after,
1031 state: response.state,
1032 prev_batch_token: response.start,
1033 next_batch_token: response.end,
1034 })
1035 }
1036
1037 pub(crate) async fn request_members(&self) -> Result<()> {
1038 self.client
1039 .locks()
1040 .members_request_deduplicated_handler
1041 .run(self.room_id().to_owned(), async move {
1042 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
1043 let response = self
1044 .client
1045 .send(request.clone())
1046 .with_request_config(
1047 // In some cases it can take longer than 30s to load:
1048 // https://github.com/element-hq/synapse/issues/16872
1049 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
1050 )
1051 .await?;
1052
1053 // That's a large `Future`. Let's `Box::pin` to reduce its size on the stack.
1054 Box::pin(self.client.base_client().receive_all_members(
1055 self.room_id(),
1056 &request,
1057 &response,
1058 ))
1059 .await?;
1060
1061 Ok(())
1062 })
1063 .await
1064 }
1065
1066 /// Request to update the encryption state for this room.
1067 ///
1068 /// It does nothing if the encryption state is already
1069 /// [`EncryptionState::Encrypted`] or [`EncryptionState::NotEncrypted`].
1070 pub async fn request_encryption_state(&self) -> Result<()> {
1071 if !self.inner.encryption_state().is_unknown() {
1072 return Ok(());
1073 }
1074
1075 self.client
1076 .locks()
1077 .encryption_state_deduplicated_handler
1078 .run(self.room_id().to_owned(), async move {
1079 // Request the event from the server.
1080 let request = get_state_event_for_key::v3::Request::new(
1081 self.room_id().to_owned(),
1082 StateEventType::RoomEncryption,
1083 "".to_owned(),
1084 );
1085 let response = match self.client.send(request).await {
1086 Ok(response) => Some(
1087 response
1088 .into_content()
1089 .deserialize_as_unchecked::<PossiblyRedactedRoomEncryptionEventContent>(
1090 )?,
1091 ),
1092 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
1093 Err(err) => return Err(err.into()),
1094 };
1095
1096 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
1097
1098 // Persist the event and the fact that we requested it from the server in
1099 // `RoomInfo`.
1100 let mut room_info = self.clone_info();
1101 room_info.mark_encryption_state_synced();
1102 room_info.set_encryption_event(response);
1103 let mut changes = StateChanges::default();
1104 changes.add_room(room_info.clone());
1105
1106 self.client.state_store().save_changes(&changes).await?;
1107 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
1108
1109 Ok(())
1110 })
1111 .await
1112 }
1113
1114 /// Check the encryption state of this room.
1115 ///
1116 /// If the result is [`EncryptionState::Unknown`], one might want to call
1117 /// [`Room::request_encryption_state`].
1118 pub fn encryption_state(&self) -> EncryptionState {
1119 self.inner.encryption_state()
1120 }
1121
1122 /// Force to update the encryption state by calling
1123 /// [`Room::request_encryption_state`], and then calling
1124 /// [`Room::encryption_state`].
1125 ///
1126 /// This method is useful to ensure the encryption state is up-to-date.
1127 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
1128 self.request_encryption_state().await?;
1129
1130 Ok(self.encryption_state())
1131 }
1132
1133 /// Gets additional context info about the client crypto.
1134 #[cfg(feature = "e2e-encryption")]
1135 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
1136 let encryption = self.client.encryption();
1137
1138 let this_device_is_verified = match encryption.get_own_device().await {
1139 Ok(Some(device)) => device.is_verified_with_cross_signing(),
1140
1141 // Should not happen, there will always be an own device
1142 _ => true,
1143 };
1144
1145 let backup_exists_on_server =
1146 encryption.backups().exists_on_server().await.unwrap_or(false);
1147
1148 CryptoContextInfo {
1149 device_creation_ts: encryption.device_creation_timestamp().await,
1150 this_device_is_verified,
1151 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1152 backup_exists_on_server,
1153 }
1154 }
1155
1156 fn are_events_visible(&self) -> bool {
1157 if let RoomState::Invited = self.inner.state() {
1158 return matches!(
1159 self.inner.history_visibility_or_default(),
1160 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1161 );
1162 }
1163
1164 true
1165 }
1166
1167 /// Sync the member list with the server.
1168 ///
1169 /// This method will de-duplicate requests if it is called multiple times in
1170 /// quick succession, in that case the return value will be `None`. This
1171 /// method does nothing if the members are already synced.
1172 pub async fn sync_members(&self) -> Result<()> {
1173 if !self.are_events_visible() {
1174 return Ok(());
1175 }
1176
1177 if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1178 }
1179
1180 /// Get a specific member of this room.
1181 ///
1182 /// *Note*: This method will fetch the members from the homeserver if the
1183 /// member list isn't synchronized due to member lazy loading. Because of
1184 /// that it might panic if it isn't run on a tokio thread.
1185 ///
1186 /// Use [get_member_no_sync()](#method.get_member_no_sync) if you want a
1187 /// method that doesn't do any requests.
1188 ///
1189 /// # Arguments
1190 ///
1191 /// * `user_id` - The ID of the user that should be fetched out of the
1192 /// store.
1193 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1194 self.sync_members().await?;
1195 self.get_member_no_sync(user_id).await
1196 }
1197
1198 /// Get a specific member of this room.
1199 ///
1200 /// *Note*: This method will not fetch the members from the homeserver if
1201 /// the member list isn't synchronized due to member lazy loading. Thus,
1202 /// members could be missing.
1203 ///
1204 /// Use [get_member()](#method.get_member) if you want to ensure to always
1205 /// have the full member list to chose from.
1206 ///
1207 /// # Arguments
1208 ///
1209 /// * `user_id` - The ID of the user that should be fetched out of the
1210 /// store.
1211 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1212 Ok(self
1213 .inner
1214 .get_member(user_id)
1215 .await?
1216 .map(|member| RoomMember::new(self.client.clone(), member)))
1217 }
1218
1219 /// Get members for this room, with the given memberships.
1220 ///
1221 /// *Note*: This method will fetch the members from the homeserver if the
1222 /// member list isn't synchronized due to member lazy loading. Because of
1223 /// that it might panic if it isn't run on a tokio thread.
1224 ///
1225 /// Use [members_no_sync()](#method.members_no_sync) if you want a
1226 /// method that doesn't do any requests.
1227 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1228 self.sync_members().await?;
1229 self.members_no_sync(memberships).await
1230 }
1231
1232 /// Get members for this room, with the given memberships.
1233 ///
1234 /// *Note*: This method will not fetch the members from the homeserver if
1235 /// the member list isn't synchronized due to member lazy loading. Thus,
1236 /// members could be missing.
1237 ///
1238 /// Use [members()](#method.members) if you want to ensure to always get
1239 /// the full member list.
1240 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1241 Ok(self
1242 .inner
1243 .members(memberships)
1244 .await?
1245 .into_iter()
1246 .map(|member| RoomMember::new(self.client.clone(), member))
1247 .collect())
1248 }
1249
1250 /// Sets the display name of the current user within this room.
1251 ///
1252 /// *Note*: This is different to [`crate::Account::set_display_name`] which
1253 /// updates the user's display name across all of their rooms.
1254 pub async fn set_own_member_display_name(
1255 &self,
1256 display_name: Option<String>,
1257 ) -> Result<send_state_event::v3::Response> {
1258 let user_id = self.own_user_id();
1259 let member_event =
1260 self.get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id).await?;
1261
1262 let Some(RawSyncOrStrippedState::Sync(raw_event)) = member_event else {
1263 return Err(Error::InsufficientData);
1264 };
1265
1266 let event = raw_event.deserialize()?;
1267
1268 let mut content = match event {
1269 SyncStateEvent::Original(original_event) => original_event.content,
1270 SyncStateEvent::Redacted(redacted_event) => {
1271 RoomMemberEventContent::new(redacted_event.content.membership)
1272 }
1273 };
1274
1275 content.displayname = display_name;
1276 self.send_state_event_for_key(user_id, content).await
1277 }
1278
1279 /// Get all state events of a given type in this room.
1280 pub async fn get_state_events(
1281 &self,
1282 event_type: StateEventType,
1283 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1284 self.client
1285 .state_store()
1286 .get_state_events(self.room_id(), event_type)
1287 .await
1288 .map_err(Into::into)
1289 }
1290
1291 /// Get all state events of a given statically-known type in this room.
1292 ///
1293 /// # Examples
1294 ///
1295 /// ```no_run
1296 /// # async {
1297 /// # let room: matrix_sdk::Room = todo!();
1298 /// use matrix_sdk::ruma::{
1299 /// events::room::member::RoomMemberEventContent, serde::Raw,
1300 /// };
1301 ///
1302 /// let room_members =
1303 /// room.get_state_events_static::<RoomMemberEventContent>().await?;
1304 /// # anyhow::Ok(())
1305 /// # };
1306 /// ```
1307 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1308 where
1309 C: StaticEventContent<IsPrefix = ruma::events::False>
1310 + StaticStateEventContent
1311 + RedactContent,
1312 C::Redacted: RedactedStateEventContent,
1313 {
1314 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1315 }
1316
1317 /// Get the state events of a given type with the given state keys in this
1318 /// room.
1319 pub async fn get_state_events_for_keys(
1320 &self,
1321 event_type: StateEventType,
1322 state_keys: &[&str],
1323 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1324 self.client
1325 .state_store()
1326 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1327 .await
1328 .map_err(Into::into)
1329 }
1330
1331 /// Get the state events of a given statically-known type with the given
1332 /// state keys in this room.
1333 ///
1334 /// # Examples
1335 ///
1336 /// ```no_run
1337 /// # async {
1338 /// # let room: matrix_sdk::Room = todo!();
1339 /// # let user_ids: &[matrix_sdk::ruma::OwnedUserId] = &[];
1340 /// use matrix_sdk::ruma::events::room::member::RoomMemberEventContent;
1341 ///
1342 /// let room_members = room
1343 /// .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
1344 /// user_ids,
1345 /// )
1346 /// .await?;
1347 /// # anyhow::Ok(())
1348 /// # };
1349 /// ```
1350 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1351 &self,
1352 state_keys: I,
1353 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1354 where
1355 C: StaticEventContent<IsPrefix = ruma::events::False>
1356 + StaticStateEventContent
1357 + RedactContent,
1358 C::StateKey: Borrow<K>,
1359 C::Redacted: RedactedStateEventContent,
1360 K: AsRef<str> + Sized + Sync + 'a,
1361 I: IntoIterator<Item = &'a K> + Send,
1362 I::IntoIter: Send,
1363 {
1364 Ok(self
1365 .client
1366 .state_store()
1367 .get_state_events_for_keys_static(self.room_id(), state_keys)
1368 .await?)
1369 }
1370
1371 /// Get a specific state event in this room.
1372 pub async fn get_state_event(
1373 &self,
1374 event_type: StateEventType,
1375 state_key: &str,
1376 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1377 self.client
1378 .state_store()
1379 .get_state_event(self.room_id(), event_type, state_key)
1380 .await
1381 .map_err(Into::into)
1382 }
1383
1384 /// Get a specific state event of statically-known type with an empty state
1385 /// key in this room.
1386 ///
1387 /// # Examples
1388 ///
1389 /// ```no_run
1390 /// # async {
1391 /// # let room: matrix_sdk::Room = todo!();
1392 /// use matrix_sdk::ruma::events::room::power_levels::RoomPowerLevelsEventContent;
1393 ///
1394 /// let power_levels = room
1395 /// .get_state_event_static::<RoomPowerLevelsEventContent>()
1396 /// .await?
1397 /// .expect("every room has a power_levels event")
1398 /// .deserialize()?;
1399 /// # anyhow::Ok(())
1400 /// # };
1401 /// ```
1402 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1403 where
1404 C: StaticEventContent<IsPrefix = ruma::events::False>
1405 + StaticStateEventContent<StateKey = EmptyStateKey>
1406 + RedactContent,
1407 C::Redacted: RedactedStateEventContent,
1408 {
1409 self.get_state_event_static_for_key(&EmptyStateKey).await
1410 }
1411
1412 /// Get a specific state event of statically-known type in this room.
1413 ///
1414 /// # Examples
1415 ///
1416 /// ```no_run
1417 /// # async {
1418 /// # let room: matrix_sdk::Room = todo!();
1419 /// use matrix_sdk::ruma::{
1420 /// events::room::member::RoomMemberEventContent, serde::Raw, user_id,
1421 /// };
1422 ///
1423 /// let member_event = room
1424 /// .get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id!(
1425 /// "@alice:example.org"
1426 /// ))
1427 /// .await?;
1428 /// # anyhow::Ok(())
1429 /// # };
1430 /// ```
1431 pub async fn get_state_event_static_for_key<C, K>(
1432 &self,
1433 state_key: &K,
1434 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1435 where
1436 C: StaticEventContent<IsPrefix = ruma::events::False>
1437 + StaticStateEventContent
1438 + RedactContent,
1439 C::StateKey: Borrow<K>,
1440 C::Redacted: RedactedStateEventContent,
1441 K: AsRef<str> + ?Sized + Sync,
1442 {
1443 Ok(self
1444 .client
1445 .state_store()
1446 .get_state_event_static_for_key(self.room_id(), state_key)
1447 .await?)
1448 }
1449
1450 /// Returns the parents this room advertises as its parents.
1451 ///
1452 /// Results are in no particular order.
1453 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1454 // Implements this algorithm:
1455 // https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships
1456
1457 // Get all m.space.parent events for this room
1458 Ok(self
1459 .get_state_events_static::<SpaceParentEventContent>()
1460 .await?
1461 .into_iter()
1462 // Extract state key (ie. the parent's id) and sender
1463 .filter_map(|parent_event| match parent_event.deserialize() {
1464 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1465 Some((e.state_key.to_owned(), e.sender))
1466 }
1467 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1468 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1469 Err(e) => {
1470 info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1471 None
1472 }
1473 })
1474 // Check whether the parent recognizes this room as its child
1475 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1476 let Some(parent_room) = self.client.get_room(&state_key) else {
1477 // We are not in the room, cannot check if the relationship is reciprocal
1478 // TODO: try peeking into the room
1479 return Ok(ParentSpace::Unverifiable(state_key));
1480 };
1481 // Get the m.space.child state of the parent with this room's id
1482 // as state key.
1483 if let Some(child_event) = parent_room
1484 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1485 .await?
1486 {
1487 match child_event.deserialize() {
1488 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1489 // There is a valid m.space.child in the parent pointing to
1490 // this room
1491 return Ok(ParentSpace::Reciprocal(parent_room));
1492 }
1493 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1494 Ok(SyncOrStrippedState::Stripped(_)) => {}
1495 Err(e) => {
1496 info!(
1497 room_id = ?self.room_id(), parent_room_id = ?state_key,
1498 "Could not deserialize m.space.child: {e}"
1499 );
1500 }
1501 }
1502 // Otherwise the event is either invalid or redacted. If
1503 // redacted it would be missing the
1504 // `via` key, thereby invalidating that end of the
1505 // relationship: https://spec.matrix.org/v1.8/client-server-api/#mspacechild
1506 }
1507
1508 // No reciprocal m.space.child found, let's check if the sender has the
1509 // power to set it
1510 let Some(member) = parent_room.get_member(&sender).await? else {
1511 // Sender is not even in the parent room
1512 return Ok(ParentSpace::Illegitimate(parent_room));
1513 };
1514
1515 if member.can_send_state(StateEventType::SpaceChild) {
1516 // Sender does have the power to set m.room.child
1517 Ok(ParentSpace::WithPowerlevel(parent_room))
1518 } else {
1519 Ok(ParentSpace::Illegitimate(parent_room))
1520 }
1521 })
1522 .collect::<FuturesUnordered<_>>())
1523 }
1524
1525 /// Read account data in this room, from storage.
1526 pub async fn account_data(
1527 &self,
1528 data_type: RoomAccountDataEventType,
1529 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1530 self.client
1531 .state_store()
1532 .get_room_account_data_event(self.room_id(), data_type)
1533 .await
1534 .map_err(Into::into)
1535 }
1536
1537 /// Get account data of a statically-known type in this room, from storage.
1538 ///
1539 /// # Examples
1540 ///
1541 /// ```no_run
1542 /// # async {
1543 /// # let room: matrix_sdk::Room = todo!();
1544 /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1545 ///
1546 /// match room.account_data_static::<FullyReadEventContent>().await? {
1547 /// Some(fully_read) => {
1548 /// println!("Found read marker: {:?}", fully_read.deserialize()?)
1549 /// }
1550 /// None => println!("No read marker for this room"),
1551 /// }
1552 /// # anyhow::Ok(())
1553 /// # };
1554 /// ```
1555 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1556 where
1557 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1558 {
1559 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1560 }
1561
1562 /// Check if all members of this room are verified and all their devices are
1563 /// verified.
1564 ///
1565 /// Returns true if all devices in the room are verified, otherwise false.
1566 #[cfg(feature = "e2e-encryption")]
1567 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1568 let user_ids = self
1569 .client
1570 .state_store()
1571 .get_user_ids(self.room_id(), RoomMemberships::empty())
1572 .await?;
1573
1574 for user_id in user_ids {
1575 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1576 let any_unverified = devices.devices().any(|d| !d.is_verified());
1577
1578 if any_unverified {
1579 return Ok(false);
1580 }
1581 }
1582
1583 Ok(true)
1584 }
1585
1586 /// Set the given account data event for this room.
1587 ///
1588 /// # Example
1589 /// ```
1590 /// # async {
1591 /// # let room: matrix_sdk::Room = todo!();
1592 /// # let event_id: ruma::OwnedEventId = todo!();
1593 /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1594 /// let content = FullyReadEventContent::new(event_id);
1595 ///
1596 /// room.set_account_data(content).await?;
1597 /// # anyhow::Ok(())
1598 /// # };
1599 /// ```
1600 pub async fn set_account_data<T>(
1601 &self,
1602 content: T,
1603 ) -> Result<set_room_account_data::v3::Response>
1604 where
1605 T: RoomAccountDataEventContent,
1606 {
1607 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1608
1609 let request = set_room_account_data::v3::Request::new(
1610 own_user.to_owned(),
1611 self.room_id().to_owned(),
1612 &content,
1613 )?;
1614
1615 Ok(self.client.send(request).await?)
1616 }
1617
1618 /// Set the given raw account data event in this room.
1619 ///
1620 /// # Example
1621 /// ```
1622 /// # async {
1623 /// # let room: matrix_sdk::Room = todo!();
1624 /// use matrix_sdk::ruma::{
1625 /// events::{
1626 /// AnyRoomAccountDataEventContent, RoomAccountDataEventContent,
1627 /// marked_unread::MarkedUnreadEventContent,
1628 /// },
1629 /// serde::Raw,
1630 /// };
1631 /// let marked_unread_content = MarkedUnreadEventContent::new(true);
1632 /// let full_event: AnyRoomAccountDataEventContent =
1633 /// marked_unread_content.clone().into();
1634 /// room.set_account_data_raw(
1635 /// marked_unread_content.event_type(),
1636 /// Raw::new(&full_event).unwrap(),
1637 /// )
1638 /// .await?;
1639 /// # anyhow::Ok(())
1640 /// # };
1641 /// ```
1642 pub async fn set_account_data_raw(
1643 &self,
1644 event_type: RoomAccountDataEventType,
1645 content: Raw<AnyRoomAccountDataEventContent>,
1646 ) -> Result<set_room_account_data::v3::Response> {
1647 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1648
1649 let request = set_room_account_data::v3::Request::new_raw(
1650 own_user.to_owned(),
1651 self.room_id().to_owned(),
1652 event_type,
1653 content,
1654 );
1655
1656 Ok(self.client.send(request).await?)
1657 }
1658
1659 /// Adds a tag to the room, or updates it if it already exists.
1660 ///
1661 /// Returns the [`create_tag::v3::Response`] from the server.
1662 ///
1663 /// # Arguments
1664 /// * `tag` - The tag to add or update.
1665 ///
1666 /// * `tag_info` - Information about the tag, generally containing the
1667 /// `order` parameter.
1668 ///
1669 /// # Examples
1670 ///
1671 /// ```no_run
1672 /// # use std::str::FromStr;
1673 /// # use ruma::events::tag::{TagInfo, TagName, UserTagName};
1674 /// # async {
1675 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
1676 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
1677 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
1678 /// use matrix_sdk::ruma::events::tag::TagInfo;
1679 ///
1680 /// if let Some(room) = client.get_room(&room_id) {
1681 /// let mut tag_info = TagInfo::new();
1682 /// tag_info.order = Some(0.9);
1683 /// let user_tag = UserTagName::from_str("u.work")?;
1684 ///
1685 /// room.set_tag(TagName::User(user_tag), tag_info).await?;
1686 /// }
1687 /// # anyhow::Ok(()) };
1688 /// ```
1689 pub async fn set_tag(
1690 &self,
1691 tag: TagName,
1692 tag_info: TagInfo,
1693 ) -> Result<create_tag::v3::Response> {
1694 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1695 let request = create_tag::v3::Request::new(
1696 user_id.to_owned(),
1697 self.inner.room_id().to_owned(),
1698 tag.to_string(),
1699 tag_info,
1700 );
1701 Ok(self.client.send(request).await?)
1702 }
1703
1704 /// Removes a tag from the room.
1705 ///
1706 /// Returns the [`delete_tag::v3::Response`] from the server.
1707 ///
1708 /// # Arguments
1709 /// * `tag` - The tag to remove.
1710 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1711 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1712 let request = delete_tag::v3::Request::new(
1713 user_id.to_owned(),
1714 self.inner.room_id().to_owned(),
1715 tag.to_string(),
1716 );
1717 Ok(self.client.send(request).await?)
1718 }
1719
1720 /// Add or remove the `m.favourite` flag for this room.
1721 ///
1722 /// If `is_favourite` is `true`, and the `m.low_priority` tag is set on the
1723 /// room, the tag will be removed too.
1724 ///
1725 /// # Arguments
1726 ///
1727 /// * `is_favourite` - Whether to mark this room as favourite.
1728 /// * `tag_order` - The order of the tag if any.
1729 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1730 if is_favourite {
1731 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1732
1733 self.set_tag(TagName::Favorite, tag_info).await?;
1734
1735 if self.is_low_priority() {
1736 self.remove_tag(TagName::LowPriority).await?;
1737 }
1738 } else {
1739 self.remove_tag(TagName::Favorite).await?;
1740 }
1741 Ok(())
1742 }
1743
1744 /// Add or remove the `m.lowpriority` flag for this room.
1745 ///
1746 /// If `is_low_priority` is `true`, and the `m.favourite` tag is set on the
1747 /// room, the tag will be removed too.
1748 ///
1749 /// # Arguments
1750 ///
1751 /// * `is_low_priority` - Whether to mark this room as low_priority or not.
1752 /// * `tag_order` - The order of the tag if any.
1753 pub async fn set_is_low_priority(
1754 &self,
1755 is_low_priority: bool,
1756 tag_order: Option<f64>,
1757 ) -> Result<()> {
1758 if is_low_priority {
1759 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1760
1761 self.set_tag(TagName::LowPriority, tag_info).await?;
1762
1763 if self.is_favourite() {
1764 self.remove_tag(TagName::Favorite).await?;
1765 }
1766 } else {
1767 self.remove_tag(TagName::LowPriority).await?;
1768 }
1769 Ok(())
1770 }
1771
1772 /// Sets whether this room is a DM.
1773 ///
1774 /// When setting this room as DM, it will be marked as DM for all active
1775 /// members of the room. When unsetting this room as DM, it will be
1776 /// unmarked as DM for all users, not just the members.
1777 ///
1778 /// # Arguments
1779 /// * `is_direct` - Whether to mark this room as direct.
1780 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1781 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1782
1783 let mut content = self
1784 .client
1785 .account()
1786 .account_data::<DirectEventContent>()
1787 .await?
1788 .map(|c| c.deserialize())
1789 .transpose()?
1790 .unwrap_or_default();
1791
1792 let this_room_id = self.inner.room_id();
1793
1794 if is_direct {
1795 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1796 room_members.retain(|member| member.user_id() != self.own_user_id());
1797
1798 for member in room_members {
1799 let entry = content.entry(member.user_id().into()).or_default();
1800 if !entry.iter().any(|room_id| room_id == this_room_id) {
1801 entry.push(this_room_id.to_owned());
1802 }
1803 }
1804 } else {
1805 for (_, list) in content.iter_mut() {
1806 list.retain(|room_id| *room_id != this_room_id);
1807 }
1808
1809 // Remove user ids that don't have any room marked as DM
1810 content.retain(|_, list| !list.is_empty());
1811 }
1812
1813 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1814
1815 self.client.send(request).await?;
1816 Ok(())
1817 }
1818
1819 /// Tries to decrypt a room event.
1820 ///
1821 /// # Arguments
1822 /// * `event` - The room event to be decrypted.
1823 ///
1824 /// Returns the decrypted event. In the case of a decryption error, returns
1825 /// a `TimelineEvent` representing the decryption error.
1826 #[cfg(feature = "e2e-encryption")]
1827 #[cfg(not(feature = "experimental-encrypted-state-events"))]
1828 pub async fn decrypt_event(
1829 &self,
1830 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1831 push_ctx: Option<&PushContext>,
1832 ) -> Result<TimelineEvent> {
1833 let machine = self.client.olm_machine().await;
1834 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1835
1836 match machine
1837 .try_decrypt_room_event(
1838 event.cast_ref(),
1839 self.inner.room_id(),
1840 self.client.decryption_settings(),
1841 )
1842 .await?
1843 {
1844 RoomEventDecryptionResult::Decrypted(decrypted) => {
1845 let push_actions = if let Some(push_ctx) = push_ctx {
1846 Some(push_ctx.for_event(&decrypted.event).await)
1847 } else {
1848 None
1849 };
1850 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1851 }
1852 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1853 self.client
1854 .encryption()
1855 .backups()
1856 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1857 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1858 }
1859 }
1860 }
1861
1862 /// Tries to decrypt a room event.
1863 ///
1864 /// # Arguments
1865 /// * `event` - The room event to be decrypted.
1866 ///
1867 /// Returns the decrypted event. In the case of a decryption error, returns
1868 /// a `TimelineEvent` representing the decryption error.
1869 #[cfg(feature = "experimental-encrypted-state-events")]
1870 pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1871 &self,
1872 event: &Raw<T>,
1873 push_ctx: Option<&PushContext>,
1874 ) -> Result<TimelineEvent> {
1875 let machine = self.client.olm_machine().await;
1876 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1877
1878 match machine
1879 .try_decrypt_room_event(
1880 event.cast_ref(),
1881 self.inner.room_id(),
1882 self.client.decryption_settings(),
1883 )
1884 .await?
1885 {
1886 RoomEventDecryptionResult::Decrypted(decrypted) => {
1887 let push_actions = if let Some(push_ctx) = push_ctx {
1888 Some(push_ctx.for_event(&decrypted.event).await)
1889 } else {
1890 None
1891 };
1892 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1893 }
1894 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1895 self.client
1896 .encryption()
1897 .backups()
1898 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1899 // Cast safety: Anything that can be cast to EncryptedEvent must be a timeline
1900 // event.
1901 Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1902 }
1903 }
1904 }
1905
1906 /// Fetches the [`EncryptionInfo`] for an event decrypted with the supplied
1907 /// session_id.
1908 ///
1909 /// This may be used when we receive an update for a session, and we want to
1910 /// reflect the changes in messages we have received that were encrypted
1911 /// with that session, e.g. to remove a warning shield because a device is
1912 /// now verified.
1913 ///
1914 /// # Arguments
1915 /// * `session_id` - The ID of the Megolm session to get information for.
1916 /// * `sender` - The (claimed) sender of the event where the session was
1917 /// used.
1918 #[cfg(feature = "e2e-encryption")]
1919 pub async fn get_encryption_info(
1920 &self,
1921 session_id: &str,
1922 sender: &UserId,
1923 ) -> Option<Arc<EncryptionInfo>> {
1924 let machine = self.client.olm_machine().await;
1925 let machine = machine.as_ref()?;
1926 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1927 }
1928
1929 /// Forces the currently active room key, which is used to encrypt messages,
1930 /// to be rotated.
1931 ///
1932 /// A new room key will be crated and shared with all the room members the
1933 /// next time a message will be sent. You don't have to call this method,
1934 /// room keys will be rotated automatically when necessary. This method is
1935 /// still useful for debugging purposes.
1936 ///
1937 /// For more info please take a look a the [`encryption`] module
1938 /// documentation.
1939 ///
1940 /// [`encryption`]: crate::encryption
1941 #[cfg(feature = "e2e-encryption")]
1942 pub async fn discard_room_key(&self) -> Result<()> {
1943 let machine = self.client.olm_machine().await;
1944 if let Some(machine) = machine.as_ref() {
1945 machine.discard_room_key(self.inner.room_id()).await?;
1946 Ok(())
1947 } else {
1948 Err(Error::NoOlmMachine)
1949 }
1950 }
1951
1952 /// Ban the user with `UserId` from this room.
1953 ///
1954 /// # Arguments
1955 ///
1956 /// * `user_id` - The user to ban with `UserId`.
1957 ///
1958 /// * `reason` - The reason for banning this user.
1959 #[instrument(skip_all)]
1960 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1961 let request = assign!(
1962 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1963 { reason: reason.map(ToOwned::to_owned) }
1964 );
1965 self.client.send(request).await?;
1966 Ok(())
1967 }
1968
1969 /// Unban the user with `UserId` from this room.
1970 ///
1971 /// # Arguments
1972 ///
1973 /// * `user_id` - The user to unban with `UserId`.
1974 ///
1975 /// * `reason` - The reason for unbanning this user.
1976 #[instrument(skip_all)]
1977 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1978 let request = assign!(
1979 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1980 { reason: reason.map(ToOwned::to_owned) }
1981 );
1982 self.client.send(request).await?;
1983 Ok(())
1984 }
1985
1986 /// Kick a user out of this room.
1987 ///
1988 /// # Arguments
1989 ///
1990 /// * `user_id` - The `UserId` of the user that should be kicked out of the
1991 /// room.
1992 ///
1993 /// * `reason` - Optional reason why the room member is being kicked out.
1994 #[instrument(skip_all)]
1995 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1996 let request = assign!(
1997 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1998 { reason: reason.map(ToOwned::to_owned) }
1999 );
2000 self.client.send(request).await?;
2001 Ok(())
2002 }
2003
2004 /// Invite the specified user by `UserId` to this room.
2005 ///
2006 /// # Arguments
2007 ///
2008 /// * `user_id` - The `UserId` of the user to invite to the room.
2009 #[instrument(skip_all)]
2010 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
2011 #[cfg(feature = "e2e-encryption")]
2012 if self.client.inner.enable_share_history_on_invite {
2013 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
2014 }
2015
2016 let recipient = InvitationRecipient::UserId(InviteUserId::new(user_id.to_owned()));
2017 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2018 self.client.send(request).await?;
2019
2020 // Force a future room members reload before sending any event to prevent UTDs
2021 // that can happen when some event is sent after a room member has been invited
2022 // but before the /sync request could fetch the membership change event.
2023 self.mark_members_missing();
2024
2025 Ok(())
2026 }
2027
2028 /// Invite the specified user by third party id to this room.
2029 ///
2030 /// # Arguments
2031 ///
2032 /// * `invite_id` - A third party id of a user to invite to the room.
2033 #[instrument(skip_all)]
2034 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
2035 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
2036 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2037 self.client.send(request).await?;
2038
2039 // Force a future room members reload before sending any event to prevent UTDs
2040 // that can happen when some event is sent after a room member has been invited
2041 // but before the /sync request could fetch the membership change event.
2042 self.mark_members_missing();
2043
2044 Ok(())
2045 }
2046
2047 /// Activate typing notice for this room.
2048 ///
2049 /// The typing notice remains active for 4s. It can be deactivate at any
2050 /// point by setting typing to `false`. If this method is called while
2051 /// the typing notice is active nothing will happen. This method can be
2052 /// called on every key stroke, since it will do nothing while typing is
2053 /// active.
2054 ///
2055 /// # Arguments
2056 ///
2057 /// * `typing` - Whether the user is typing or has stopped typing.
2058 ///
2059 /// # Examples
2060 ///
2061 /// ```no_run
2062 /// use std::time::Duration;
2063 ///
2064 /// use matrix_sdk::ruma::api::client::typing::create_typing_event::v3::Typing;
2065 /// # use matrix_sdk::{
2066 /// # Client, config::SyncSettings,
2067 /// # ruma::room_id,
2068 /// # };
2069 /// # use url::Url;
2070 ///
2071 /// # async {
2072 /// # let homeserver = Url::parse("http://localhost:8080")?;
2073 /// # let client = Client::new(homeserver).await?;
2074 /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2075 ///
2076 /// if let Some(room) = client.get_room(&room_id) {
2077 /// room.typing_notice(true).await?
2078 /// }
2079 /// # anyhow::Ok(()) };
2080 /// ```
2081 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
2082 self.ensure_room_joined()?;
2083
2084 // Only send a request to the homeserver if the old timeout has elapsed
2085 // or the typing notice changed state within the `TYPING_NOTICE_TIMEOUT`
2086 let send = if let Some(typing_time) =
2087 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
2088 {
2089 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
2090 // We always reactivate the typing notice if typing is true or
2091 // we may need to deactivate it if it's
2092 // currently active if typing is false
2093 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
2094 } else {
2095 // Only send a request when we need to deactivate typing
2096 !typing
2097 }
2098 } else {
2099 // Typing notice is currently deactivated, therefore, send a request
2100 // only when it's about to be activated
2101 typing
2102 };
2103
2104 if send {
2105 self.send_typing_notice(typing).await?;
2106 }
2107
2108 Ok(())
2109 }
2110
2111 #[instrument(name = "typing_notice", skip(self))]
2112 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
2113 let typing = if typing {
2114 self.client
2115 .inner
2116 .typing_notice_times
2117 .write()
2118 .unwrap()
2119 .insert(self.room_id().to_owned(), Instant::now());
2120 Typing::Yes(TypingInfo::new(TYPING_NOTICE_TIMEOUT))
2121 } else {
2122 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
2123 Typing::No
2124 };
2125
2126 let request = create_typing_event::v3::Request::new(
2127 self.own_user_id().to_owned(),
2128 self.room_id().to_owned(),
2129 typing,
2130 );
2131
2132 self.client.send(request).await?;
2133
2134 Ok(())
2135 }
2136
2137 /// Send a request to set a single receipt.
2138 ///
2139 /// If an unthreaded receipt is sent, this will also unset the unread flag
2140 /// of the room if necessary.
2141 ///
2142 /// # Arguments
2143 ///
2144 /// * `receipt_type` - The type of the receipt to set. Note that it is
2145 /// possible to set the fully-read marker although it is technically not a
2146 /// receipt.
2147 ///
2148 /// * `thread` - The thread where this receipt should apply, if any. Note
2149 /// that this must be [`ReceiptThread::Unthreaded`] when sending a
2150 /// [`ReceiptType::FullyRead`][create_receipt::v3::ReceiptType::FullyRead].
2151 ///
2152 /// * `event_id` - The `EventId` of the event to set the receipt on.
2153 #[instrument(skip_all)]
2154 pub async fn send_single_receipt(
2155 &self,
2156 receipt_type: create_receipt::v3::ReceiptType,
2157 thread: ReceiptThread,
2158 event_id: OwnedEventId,
2159 ) -> Result<()> {
2160 // Since the receipt type and the thread aren't Hash/Ord, flatten then as a
2161 // string key.
2162 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
2163
2164 self.client
2165 .inner
2166 .locks
2167 .read_receipt_deduplicated_handler
2168 .run((request_key, event_id.clone()), async {
2169 // We will unset the unread flag if we send an unthreaded receipt.
2170 let is_unthreaded = thread == ReceiptThread::Unthreaded;
2171
2172 let mut request = create_receipt::v3::Request::new(
2173 self.room_id().to_owned(),
2174 receipt_type,
2175 event_id,
2176 );
2177 request.thread = thread;
2178
2179 self.client.send(request).await?;
2180
2181 if is_unthreaded {
2182 self.set_unread_flag(false).await?;
2183 }
2184
2185 Ok(())
2186 })
2187 .await
2188 }
2189
2190 /// Send a request to set multiple receipts at once.
2191 ///
2192 /// This will also unset the unread flag of the room if necessary.
2193 ///
2194 /// # Arguments
2195 ///
2196 /// * `receipts` - The `Receipts` to send.
2197 ///
2198 /// If `receipts` is empty, this is a no-op.
2199 #[instrument(skip_all)]
2200 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2201 if receipts.is_empty() {
2202 return Ok(());
2203 }
2204
2205 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2206 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2207 fully_read,
2208 read_receipt: public_read_receipt,
2209 private_read_receipt,
2210 });
2211
2212 self.client.send(request).await?;
2213
2214 self.set_unread_flag(false).await?;
2215
2216 Ok(())
2217 }
2218
2219 /// Helper function to enable End-to-end encryption in this room.
2220 /// `encrypted_state_events` is not used unless the
2221 /// `experimental-encrypted-state-events` feature is enabled.
2222 #[allow(unused_variables, unused_mut)]
2223 async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2224 use ruma::{
2225 EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2226 };
2227 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2228
2229 if !self.latest_encryption_state().await?.is_encrypted() {
2230 let mut content =
2231 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2232 #[cfg(feature = "experimental-encrypted-state-events")]
2233 if encrypted_state_events {
2234 content = content.with_encrypted_state();
2235 }
2236 self.send_state_event(content).await?;
2237
2238 // Spin on the sync beat event, since the first sync we receive might not
2239 // include the encryption event.
2240 //
2241 // TODO do we want to return an error here if we time out? This
2242 // could be quite useful if someone wants to enable encryption and
2243 // send a message right after it's enabled.
2244 let res = timeout(
2245 async {
2246 loop {
2247 // Listen for sync events, then check if the encryption state is known.
2248 self.client.inner.sync_beat.listen().await;
2249 let _state_store_lock =
2250 self.client.base_client().state_store_lock().lock().await;
2251
2252 if !self.inner.encryption_state().is_unknown() {
2253 break;
2254 }
2255 }
2256 },
2257 SYNC_WAIT_TIME,
2258 )
2259 .await;
2260
2261 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2262
2263 // If encryption was enabled, return.
2264 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2265 if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2266 debug!("room successfully marked as encrypted");
2267 return Ok(());
2268 }
2269
2270 // If encryption with state event encryption was enabled, return.
2271 #[cfg(feature = "experimental-encrypted-state-events")]
2272 if res.is_ok() && {
2273 if encrypted_state_events {
2274 self.inner.encryption_state().is_state_encrypted()
2275 } else {
2276 self.inner.encryption_state().is_encrypted()
2277 }
2278 } {
2279 debug!("room successfully marked as encrypted");
2280 return Ok(());
2281 }
2282
2283 // If after waiting for multiple syncs, we don't have the encryption state we
2284 // expect, assume the local encryption state is incorrect; this will
2285 // cause the SDK to re-request it later for confirmation, instead of
2286 // assuming it's sync'd and correct (and not encrypted).
2287 debug!("still not marked as encrypted, marking encryption state as missing");
2288
2289 let mut room_info = self.clone_info();
2290 room_info.mark_encryption_state_missing();
2291 let mut changes = StateChanges::default();
2292 changes.add_room(room_info.clone());
2293
2294 self.client.state_store().save_changes(&changes).await?;
2295 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2296 }
2297
2298 Ok(())
2299 }
2300
2301 /// Enable End-to-end encryption in this room.
2302 ///
2303 /// This method will be a noop if encryption is already enabled, otherwise
2304 /// sends a `m.room.encryption` state event to the room. This might fail if
2305 /// you don't have the appropriate power level to enable end-to-end
2306 /// encryption.
2307 ///
2308 /// A sync needs to be received to update the local room state. This method
2309 /// will wait for a sync to be received, this might time out if no
2310 /// sync loop is running or if the server is slow.
2311 ///
2312 /// # Examples
2313 ///
2314 /// ```no_run
2315 /// # use matrix_sdk::{
2316 /// # Client, config::SyncSettings,
2317 /// # ruma::room_id,
2318 /// # };
2319 /// # use url::Url;
2320 /// #
2321 /// # async {
2322 /// # let homeserver = Url::parse("http://localhost:8080")?;
2323 /// # let client = Client::new(homeserver).await?;
2324 /// # let room_id = room_id!("!test:localhost");
2325 /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2326 ///
2327 /// if let Some(room) = client.get_room(&room_id) {
2328 /// room.enable_encryption().await?
2329 /// }
2330 /// # anyhow::Ok(()) };
2331 /// ```
2332 #[instrument(skip_all)]
2333 pub async fn enable_encryption(&self) -> Result<()> {
2334 self.enable_encryption_inner(false).await
2335 }
2336
2337 /// Enable End-to-end encryption in this room, opting into experimental
2338 /// state event encryption.
2339 ///
2340 /// This method will be a noop if encryption is already enabled, otherwise
2341 /// sends a `m.room.encryption` state event to the room. This might fail if
2342 /// you don't have the appropriate power level to enable end-to-end
2343 /// encryption.
2344 ///
2345 /// A sync needs to be received to update the local room state. This method
2346 /// will wait for a sync to be received, this might time out if no
2347 /// sync loop is running or if the server is slow.
2348 ///
2349 /// # Examples
2350 ///
2351 /// ```no_run
2352 /// # use matrix_sdk::{
2353 /// # Client, config::SyncSettings,
2354 /// # ruma::room_id,
2355 /// # };
2356 /// # use url::Url;
2357 /// #
2358 /// # async {
2359 /// # let homeserver = Url::parse("http://localhost:8080")?;
2360 /// # let client = Client::new(homeserver).await?;
2361 /// # let room_id = room_id!("!test:localhost");
2362 /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2363 ///
2364 /// if let Some(room) = client.get_room(&room_id) {
2365 /// room.enable_encryption_with_state_event_encryption().await?
2366 /// }
2367 /// # anyhow::Ok(()) };
2368 /// ```
2369 #[instrument(skip_all)]
2370 #[cfg(feature = "experimental-encrypted-state-events")]
2371 pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2372 self.enable_encryption_inner(true).await
2373 }
2374
2375 /// Share a room key with users in the given room.
2376 ///
2377 /// This will create Olm sessions with all the users/device pairs in the
2378 /// room if necessary and share a room key that can be shared with them.
2379 ///
2380 /// Does nothing if no room key needs to be shared.
2381 // TODO: expose this publicly so people can pre-share a group session if
2382 // e.g. a user starts to type a message for a room.
2383 #[cfg(feature = "e2e-encryption")]
2384 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2385 async fn preshare_room_key(&self) -> Result<()> {
2386 self.ensure_room_joined()?;
2387
2388 // Take and release the lock on the store, if needs be.
2389 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2390 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2391
2392 self.client
2393 .locks()
2394 .group_session_deduplicated_handler
2395 .run(self.room_id().to_owned(), async move {
2396 {
2397 let members = self
2398 .client
2399 .state_store()
2400 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2401 .await?;
2402 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2403 };
2404
2405 let response = self.share_room_key().await;
2406
2407 // If one of the responses failed invalidate the group
2408 // session as using it would end up in undecryptable
2409 // messages.
2410 if let Err(r) = response {
2411 let machine = self.client.olm_machine().await;
2412 if let Some(machine) = machine.as_ref() {
2413 machine.discard_room_key(self.room_id()).await?;
2414 }
2415 return Err(r);
2416 }
2417
2418 Ok(())
2419 })
2420 .await
2421 }
2422
2423 /// Share a group session for a room.
2424 ///
2425 /// # Panics
2426 ///
2427 /// Panics if the client isn't logged in.
2428 #[cfg(feature = "e2e-encryption")]
2429 #[instrument(skip_all)]
2430 async fn share_room_key(&self) -> Result<()> {
2431 self.ensure_room_joined()?;
2432
2433 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2434
2435 for request in requests {
2436 let response = self.client.send_to_device(&request).await?;
2437 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2438 }
2439
2440 Ok(())
2441 }
2442
2443 /// Wait for the room to be fully synced.
2444 ///
2445 /// This method makes sure the room that was returned when joining a room
2446 /// has been echoed back in the sync.
2447 ///
2448 /// Warning: This waits until a sync happens and does not return if no sync
2449 /// is happening. It can also return early when the room is not a joined
2450 /// room anymore.
2451 #[instrument(skip_all)]
2452 pub async fn sync_up(&self) {
2453 while !self.is_synced() && self.state() == RoomState::Joined {
2454 let wait_for_beat = self.client.inner.sync_beat.listen();
2455 // We don't care whether it's a timeout or a sync beat.
2456 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2457 }
2458 }
2459
2460 /// Send a message-like event to this room.
2461 ///
2462 /// Returns the parsed response from the server.
2463 ///
2464 /// If the encryption feature is enabled this method will transparently
2465 /// encrypt the event if this room is encrypted (except for `m.reaction`
2466 /// events, which are never encrypted).
2467 ///
2468 /// **Note**: If you just want to send an event with custom JSON content to
2469 /// a room, you can use the [`send_raw()`][Self::send_raw] method for that.
2470 ///
2471 /// If you want to set a transaction ID for the event, use
2472 /// [`.with_transaction_id()`][SendMessageLikeEvent::with_transaction_id]
2473 /// on the returned value before `.await`ing it.
2474 ///
2475 /// # Arguments
2476 ///
2477 /// * `content` - The content of the message event.
2478 ///
2479 /// # Examples
2480 ///
2481 /// ```no_run
2482 /// # use std::sync::{Arc, RwLock};
2483 /// # use matrix_sdk::{Client, config::SyncSettings};
2484 /// # use url::Url;
2485 /// # use matrix_sdk::ruma::room_id;
2486 /// # use serde::{Deserialize, Serialize};
2487 /// use matrix_sdk::ruma::{
2488 /// MilliSecondsSinceUnixEpoch, TransactionId,
2489 /// events::{
2490 /// macros::EventContent,
2491 /// room::message::{RoomMessageEventContent, TextMessageEventContent},
2492 /// },
2493 /// uint,
2494 /// };
2495 ///
2496 /// # async {
2497 /// # let homeserver = Url::parse("http://localhost:8080")?;
2498 /// # let mut client = Client::new(homeserver).await?;
2499 /// # let room_id = room_id!("!test:localhost");
2500 /// let content = RoomMessageEventContent::text_plain("Hello world");
2501 /// let txn_id = TransactionId::new();
2502 ///
2503 /// if let Some(room) = client.get_room(&room_id) {
2504 /// room.send(content).with_transaction_id(txn_id).await?;
2505 /// }
2506 ///
2507 /// // Custom events work too:
2508 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2509 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
2510 /// struct TokenEventContent {
2511 /// token: String,
2512 /// #[serde(rename = "exp")]
2513 /// expires_at: MilliSecondsSinceUnixEpoch,
2514 /// }
2515 ///
2516 /// # fn generate_token() -> String { todo!() }
2517 /// let content = TokenEventContent {
2518 /// token: generate_token(),
2519 /// expires_at: {
2520 /// let now = MilliSecondsSinceUnixEpoch::now();
2521 /// MilliSecondsSinceUnixEpoch(now.0 + uint!(30_000))
2522 /// },
2523 /// };
2524 ///
2525 /// if let Some(room) = client.get_room(&room_id) {
2526 /// room.send(content).await?;
2527 /// }
2528 /// # anyhow::Ok(()) };
2529 /// ```
2530 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2531 SendMessageLikeEvent::new(self, content)
2532 }
2533
2534 /// Run /keys/query requests for all the non-tracked users, and for users
2535 /// with an out-of-date device list.
2536 #[cfg(feature = "e2e-encryption")]
2537 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2538 let olm = self.client.olm_machine().await;
2539 let olm = olm.as_ref().expect("Olm machine wasn't started");
2540
2541 let members =
2542 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2543
2544 let tracked: HashMap<_, _> = olm
2545 .store()
2546 .load_tracked_users()
2547 .await?
2548 .into_iter()
2549 .map(|tracked| (tracked.user_id, tracked.dirty))
2550 .collect();
2551
2552 // A member has no unknown devices iff it was tracked *and* the tracking is
2553 // not considered dirty.
2554 let members_with_unknown_devices =
2555 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2556
2557 let (req_id, request) =
2558 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2559
2560 if !request.device_keys.is_empty() {
2561 self.client.keys_query(&req_id, request.device_keys).await?;
2562 }
2563
2564 Ok(())
2565 }
2566
2567 /// Send a message-like event with custom JSON content to this room.
2568 ///
2569 /// Returns the parsed response from the server.
2570 ///
2571 /// If the encryption feature is enabled this method will transparently
2572 /// encrypt the event if this room is encrypted (except for `m.reaction`
2573 /// events, which are never encrypted).
2574 ///
2575 /// This method is equivalent to the [`send()`][Self::send] method but
2576 /// allows sending custom JSON payloads, e.g. constructed using the
2577 /// [`serde_json::json!()`] macro.
2578 ///
2579 /// If you want to set a transaction ID for the event, use
2580 /// [`.with_transaction_id()`][SendRawMessageLikeEvent::with_transaction_id]
2581 /// on the returned value before `.await`ing it.
2582 ///
2583 /// # Arguments
2584 ///
2585 /// * `event_type` - The type of the event.
2586 ///
2587 /// * `content` - The content of the event as a raw JSON value. The argument
2588 /// type can be `serde_json::Value`, but also other raw JSON types; for
2589 /// the full list check the documentation of
2590 /// [`IntoRawMessageLikeEventContent`].
2591 ///
2592 /// # Examples
2593 ///
2594 /// ```no_run
2595 /// # use std::sync::{Arc, RwLock};
2596 /// # use matrix_sdk::{Client, config::SyncSettings};
2597 /// # use url::Url;
2598 /// # use matrix_sdk::ruma::room_id;
2599 /// # async {
2600 /// # let homeserver = Url::parse("http://localhost:8080")?;
2601 /// # let mut client = Client::new(homeserver).await?;
2602 /// # let room_id = room_id!("!test:localhost");
2603 /// use serde_json::json;
2604 ///
2605 /// if let Some(room) = client.get_room(&room_id) {
2606 /// room.send_raw("m.room.message", json!({ "body": "Hello world" })).await?;
2607 /// }
2608 /// # anyhow::Ok(()) };
2609 /// ```
2610 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2611 pub fn send_raw<'a>(
2612 &'a self,
2613 event_type: &'a str,
2614 content: impl IntoRawMessageLikeEventContent,
2615 ) -> SendRawMessageLikeEvent<'a> {
2616 // Note: the recorded instrument fields are saved in
2617 // `SendRawMessageLikeEvent::into_future`.
2618 SendRawMessageLikeEvent::new(self, event_type, content)
2619 }
2620
2621 /// Send an attachment to this room.
2622 ///
2623 /// This will upload the given data that the reader produces using the
2624 /// [`upload()`] method and post an event to the given room.
2625 /// If the room is encrypted and the encryption feature is enabled the
2626 /// upload will be encrypted.
2627 ///
2628 /// This is a convenience method that calls the
2629 /// [`upload()`] and afterwards the [`send()`].
2630 ///
2631 /// # Arguments
2632 /// * `filename` - The file name.
2633 ///
2634 /// * `content_type` - The type of the media, this will be used as the
2635 /// content-type header.
2636 ///
2637 /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2638 /// media.
2639 ///
2640 /// * `config` - Metadata and configuration for the attachment.
2641 ///
2642 /// # Examples
2643 ///
2644 /// ```no_run
2645 /// # use std::fs;
2646 /// # use matrix_sdk::{Client, ruma::room_id, attachment::AttachmentConfig};
2647 /// # use url::Url;
2648 /// # use mime;
2649 /// # async {
2650 /// # let homeserver = Url::parse("http://localhost:8080")?;
2651 /// # let mut client = Client::new(homeserver).await?;
2652 /// # let room_id = room_id!("!test:localhost");
2653 /// let mut image = fs::read("/home/example/my-cat.jpg")?;
2654 ///
2655 /// if let Some(room) = client.get_room(&room_id) {
2656 /// room.send_attachment(
2657 /// "my_favorite_cat.jpg",
2658 /// &mime::IMAGE_JPEG,
2659 /// image,
2660 /// AttachmentConfig::new(),
2661 /// ).await?;
2662 /// }
2663 /// # anyhow::Ok(()) };
2664 /// ```
2665 ///
2666 /// [`upload()`]: crate::Media::upload
2667 /// [`send()`]: Self::send
2668 #[instrument(skip_all)]
2669 pub fn send_attachment<'a>(
2670 &'a self,
2671 filename: impl Into<String>,
2672 content_type: &'a Mime,
2673 data: Vec<u8>,
2674 config: AttachmentConfig,
2675 ) -> SendAttachment<'a> {
2676 SendAttachment::new(self, filename.into(), content_type, data, config)
2677 }
2678
2679 /// Prepare and send an attachment to this room.
2680 ///
2681 /// This will upload the given data that the reader produces using the
2682 /// [`upload()`](#method.upload) method and post an event to the given room.
2683 /// If the room is encrypted and the encryption feature is enabled the
2684 /// upload will be encrypted.
2685 ///
2686 /// This is a convenience method that calls the
2687 /// [`Client::upload()`](#Client::method.upload) and afterwards the
2688 /// [`send()`](#method.send).
2689 ///
2690 /// # Arguments
2691 /// * `filename` - The file name.
2692 ///
2693 /// * `content_type` - The type of the media, this will be used as the
2694 /// content-type header.
2695 ///
2696 /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2697 /// media.
2698 ///
2699 /// * `config` - Metadata and configuration for the attachment.
2700 ///
2701 /// * `send_progress` - An observable to transmit forward progress about the
2702 /// upload.
2703 ///
2704 /// * `store_in_cache` - A boolean defining whether the uploaded media will
2705 /// be stored in the cache immediately after a successful upload.
2706 #[instrument(skip_all)]
2707 pub(super) async fn prepare_and_send_attachment<'a>(
2708 &'a self,
2709 filename: String,
2710 content_type: &'a Mime,
2711 data: Vec<u8>,
2712 mut config: AttachmentConfig,
2713 send_progress: SharedObservable<TransmissionProgress>,
2714 store_in_cache: bool,
2715 ) -> Result<send_message_event::v3::Response> {
2716 self.ensure_room_joined()?;
2717
2718 let txn_id = config.txn_id.take();
2719 let mentions = config.mentions.take();
2720
2721 let thumbnail = config.thumbnail.take();
2722
2723 // If necessary, store caching data for the thumbnail ahead of time.
2724 let thumbnail_cache_info = if store_in_cache {
2725 thumbnail
2726 .as_ref()
2727 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2728 } else {
2729 None
2730 };
2731
2732 #[cfg(feature = "e2e-encryption")]
2733 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2734 self.client
2735 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2736 .await?
2737 } else {
2738 self.client
2739 .media()
2740 .upload_plain_media_and_thumbnail(
2741 content_type,
2742 // TODO: get rid of this clone; wait for Ruma to use `Bytes` or something
2743 // similar.
2744 data.clone(),
2745 thumbnail,
2746 send_progress,
2747 )
2748 .await?
2749 };
2750
2751 #[cfg(not(feature = "e2e-encryption"))]
2752 let (media_source, thumbnail) = self
2753 .client
2754 .media()
2755 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2756 .await?;
2757
2758 if store_in_cache {
2759 let media_store_lock_guard = self.client.media_store().lock().await?;
2760
2761 // A failure to cache shouldn't prevent the whole upload from finishing
2762 // properly, so only log errors during caching.
2763
2764 debug!("caching the media");
2765 let request =
2766 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2767
2768 if let Err(err) = media_store_lock_guard
2769 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2770 .await
2771 {
2772 warn!("unable to cache the media after uploading it: {err}");
2773 }
2774
2775 if let Some(((data, height, width), source)) =
2776 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2777 {
2778 debug!("caching the thumbnail");
2779
2780 let request = MediaRequestParameters {
2781 source: source.clone(),
2782 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2783 };
2784
2785 if let Err(err) = media_store_lock_guard
2786 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2787 .await
2788 {
2789 warn!("unable to cache the media after uploading it: {err}");
2790 }
2791 }
2792 }
2793
2794 let content = self
2795 .make_media_event(
2796 Room::make_attachment_type(
2797 content_type,
2798 filename,
2799 media_source,
2800 config.caption,
2801 config.info,
2802 thumbnail,
2803 ),
2804 mentions,
2805 config.reply,
2806 )
2807 .await?;
2808
2809 let mut fut = self.send(content);
2810 if let Some(txn_id) = txn_id {
2811 fut = fut.with_transaction_id(txn_id);
2812 }
2813
2814 fut.await.map(|result| result.response)
2815 }
2816
2817 /// Creates the inner [`MessageType`] for an already-uploaded media file
2818 /// provided by its source.
2819 #[allow(clippy::too_many_arguments)]
2820 pub(crate) fn make_attachment_type(
2821 content_type: &Mime,
2822 filename: String,
2823 source: MediaSource,
2824 caption: Option<TextMessageEventContent>,
2825 info: Option<AttachmentInfo>,
2826 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2827 ) -> MessageType {
2828 make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2829 }
2830
2831 /// Creates the [`RoomMessageEventContent`] based on the message type,
2832 /// mentions and reply information.
2833 pub(crate) async fn make_media_event(
2834 &self,
2835 msg_type: MessageType,
2836 mentions: Option<Mentions>,
2837 reply: Option<Reply>,
2838 ) -> Result<RoomMessageEventContent> {
2839 let mut content = RoomMessageEventContent::new(msg_type);
2840 if let Some(mentions) = mentions {
2841 content = content.add_mentions(mentions);
2842 }
2843 if let Some(reply) = reply {
2844 // Since we just created the event, there is no relation attached to it. Thus,
2845 // it is safe to add the reply relation without overriding anything.
2846 content = self.make_reply_event(content.into(), reply).await?;
2847 }
2848 Ok(content)
2849 }
2850
2851 /// Creates the inner [`GalleryItemType`] for an already-uploaded media file
2852 /// provided by its source.
2853 #[cfg(feature = "unstable-msc4274")]
2854 #[allow(clippy::too_many_arguments)]
2855 pub(crate) fn make_gallery_item_type(
2856 content_type: &Mime,
2857 filename: String,
2858 source: MediaSource,
2859 caption: Option<TextMessageEventContent>,
2860 info: Option<AttachmentInfo>,
2861 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2862 ) -> GalleryItemType {
2863 make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2864 }
2865
2866 /// Update the power levels of a select set of users of this room.
2867 ///
2868 /// Issue a `power_levels` state event request to the server, changing the
2869 /// given UserId -> Int levels. May fail if the `power_levels` aren't
2870 /// locally known yet or the server rejects the state event update, e.g.
2871 /// because of insufficient permissions. Neither permissions to update
2872 /// nor whether the data might be stale is checked prior to issuing the
2873 /// request.
2874 pub async fn update_power_levels(
2875 &self,
2876 updates: Vec<(&UserId, Int)>,
2877 ) -> Result<send_state_event::v3::Response> {
2878 let mut power_levels = self.power_levels().await?;
2879
2880 for (user_id, new_level) in updates {
2881 if new_level == power_levels.users_default {
2882 power_levels.users.remove(user_id);
2883 } else {
2884 power_levels.users.insert(user_id.to_owned(), new_level);
2885 }
2886 }
2887
2888 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2889 }
2890
2891 /// Applies a set of power level changes to this room.
2892 ///
2893 /// Any values that are `None` in the given `RoomPowerLevelChanges` will
2894 /// remain unchanged.
2895 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2896 let mut power_levels = self.power_levels().await?;
2897 power_levels.apply(changes)?;
2898 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2899 Ok(())
2900 }
2901
2902 /// Resets the room's power levels to the default values
2903 ///
2904 /// [spec]: https://spec.matrix.org/v1.9/client-server-api/#mroompower_levels
2905 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2906 let creators = self.creators().unwrap_or_default();
2907 let rules = self.clone_info().room_version_rules_or_default();
2908
2909 let default_power_levels =
2910 RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2911 let changes = RoomPowerLevelChanges::from(default_power_levels);
2912 self.apply_power_level_changes(changes).await?;
2913 Ok(self.power_levels().await?)
2914 }
2915
2916 /// Gets the suggested role for the user with the provided `user_id`.
2917 ///
2918 /// This method checks the `RoomPowerLevels` events instead of loading the
2919 /// member list and looking for the member.
2920 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2921 let power_level = self.get_user_power_level(user_id).await?;
2922 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2923 }
2924
2925 /// Gets the power level the user with the provided `user_id`.
2926 ///
2927 /// This method checks the `RoomPowerLevels` events instead of loading the
2928 /// member list and looking for the member.
2929 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2930 let event = self.power_levels().await?;
2931 Ok(event.for_user(user_id))
2932 }
2933
2934 /// Gets a map with the `UserId` of users with power levels other than `0`
2935 /// and this power level.
2936 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2937 let power_levels = self.power_levels().await.ok();
2938 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2939 if let Some(power_levels) = power_levels {
2940 for (id, level) in power_levels.users.into_iter() {
2941 user_power_levels.insert(id, level.into());
2942 }
2943 }
2944 user_power_levels
2945 }
2946
2947 /// Sets the name of this room.
2948 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2949 self.send_state_event(RoomNameEventContent::new(name)).await
2950 }
2951
2952 /// Sets a new topic for this room.
2953 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2954 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2955 }
2956
2957 /// Sets the new avatar url for this room.
2958 ///
2959 /// # Arguments
2960 /// * `avatar_url` - The owned Matrix uri that represents the avatar
2961 /// * `info` - The optional image info that can be provided for the avatar
2962 pub async fn set_avatar_url(
2963 &self,
2964 url: &MxcUri,
2965 info: Option<avatar::ImageInfo>,
2966 ) -> Result<send_state_event::v3::Response> {
2967 self.ensure_room_joined()?;
2968
2969 let mut room_avatar_event = RoomAvatarEventContent::new();
2970 room_avatar_event.url = Some(url.to_owned());
2971 room_avatar_event.info = info.map(Box::new);
2972
2973 self.send_state_event(room_avatar_event).await
2974 }
2975
2976 /// Removes the avatar from the room
2977 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2978 self.send_state_event(RoomAvatarEventContent::new()).await
2979 }
2980
2981 /// Uploads a new avatar for this room.
2982 ///
2983 /// # Arguments
2984 /// * `mime` - The mime type describing the data
2985 /// * `data` - The data representation of the avatar
2986 /// * `info` - The optional image info provided for the avatar, the blurhash
2987 /// and the mimetype will always be updated
2988 pub async fn upload_avatar(
2989 &self,
2990 mime: &Mime,
2991 data: Vec<u8>,
2992 info: Option<avatar::ImageInfo>,
2993 ) -> Result<send_state_event::v3::Response> {
2994 self.ensure_room_joined()?;
2995
2996 let upload_response = self.client.media().upload(mime, data, None).await?;
2997 let mut info = info.unwrap_or_default();
2998 info.blurhash = upload_response.blurhash;
2999 info.mimetype = Some(mime.to_string());
3000
3001 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
3002 }
3003
3004 /// Send a state event with an empty state key to the homeserver.
3005 ///
3006 /// For state events with a non-empty state key, see
3007 /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3008 ///
3009 /// Returns the parsed response from the server.
3010 ///
3011 /// # Arguments
3012 ///
3013 /// * `content` - The content of the state event.
3014 ///
3015 /// # Examples
3016 ///
3017 /// ```no_run
3018 /// # use serde::{Deserialize, Serialize};
3019 /// # async {
3020 /// # let joined_room: matrix_sdk::Room = todo!();
3021 /// use matrix_sdk::ruma::{
3022 /// EventEncryptionAlgorithm,
3023 /// events::{
3024 /// EmptyStateKey, macros::EventContent,
3025 /// room::encryption::RoomEncryptionEventContent,
3026 /// },
3027 /// };
3028 ///
3029 /// let encryption_event_content = RoomEncryptionEventContent::new(
3030 /// EventEncryptionAlgorithm::MegolmV1AesSha2,
3031 /// );
3032 /// joined_room.send_state_event(encryption_event_content).await?;
3033 ///
3034 /// // Custom event:
3035 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3036 /// #[ruma_event(
3037 /// type = "org.matrix.msc_9000.xxx",
3038 /// kind = State,
3039 /// state_key_type = EmptyStateKey,
3040 /// )]
3041 /// struct XxxStateEventContent {/* fields... */}
3042 ///
3043 /// let content: XxxStateEventContent = todo!();
3044 /// joined_room.send_state_event(content).await?;
3045 /// # anyhow::Ok(()) };
3046 /// ```
3047 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3048 #[instrument(skip_all)]
3049 pub async fn send_state_event(
3050 &self,
3051 content: impl StateEventContent<StateKey = EmptyStateKey>,
3052 ) -> Result<send_state_event::v3::Response> {
3053 self.send_state_event_for_key(&EmptyStateKey, content).await
3054 }
3055
3056 /// Send a state event with an empty state key to the homeserver.
3057 ///
3058 /// For state events with a non-empty state key, see
3059 /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3060 ///
3061 /// If the experimental state event encryption feature is enabled, this
3062 /// method will transparently encrypt the event if this room is
3063 /// encrypted (except if the event type is considered critical for the room
3064 /// to function, as outlined in [MSC4362][msc4362]).
3065 ///
3066 /// Returns the parsed response from the server.
3067 ///
3068 /// # Arguments
3069 ///
3070 /// * `content` - The content of the state event.
3071 ///
3072 /// # Examples
3073 ///
3074 /// ```no_run
3075 /// # use serde::{Deserialize, Serialize};
3076 /// # async {
3077 /// # let joined_room: matrix_sdk::Room = todo!();
3078 /// use matrix_sdk::ruma::{
3079 /// EventEncryptionAlgorithm,
3080 /// events::{
3081 /// EmptyStateKey, macros::EventContent,
3082 /// room::encryption::RoomEncryptionEventContent,
3083 /// },
3084 /// };
3085 ///
3086 /// let encryption_event_content = RoomEncryptionEventContent::new(
3087 /// EventEncryptionAlgorithm::MegolmV1AesSha2,
3088 /// );
3089 /// joined_room.send_state_event(encryption_event_content).await?;
3090 ///
3091 /// // Custom event:
3092 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3093 /// #[ruma_event(
3094 /// type = "org.matrix.msc_9000.xxx",
3095 /// kind = State,
3096 /// state_key_type = EmptyStateKey,
3097 /// )]
3098 /// struct XxxStateEventContent {/* fields... */}
3099 ///
3100 /// let content: XxxStateEventContent = todo!();
3101 /// joined_room.send_state_event(content).await?;
3102 /// # anyhow::Ok(()) };
3103 /// ```
3104 ///
3105 /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/4362-encrypted-state.md
3106 #[cfg(feature = "experimental-encrypted-state-events")]
3107 #[instrument(skip_all)]
3108 pub fn send_state_event<'a>(
3109 &'a self,
3110 content: impl StateEventContent<StateKey = EmptyStateKey>,
3111 ) -> SendStateEvent<'a> {
3112 self.send_state_event_for_key(&EmptyStateKey, content)
3113 }
3114
3115 /// Send a state event to the homeserver.
3116 ///
3117 /// Returns the parsed response from the server.
3118 ///
3119 /// # Arguments
3120 ///
3121 /// * `content` - The content of the state event.
3122 ///
3123 /// * `state_key` - A unique key which defines the overwriting semantics for
3124 /// this piece of room state.
3125 ///
3126 /// # Examples
3127 ///
3128 /// ```no_run
3129 /// # use serde::{Deserialize, Serialize};
3130 /// # async {
3131 /// # let joined_room: matrix_sdk::Room = todo!();
3132 /// use matrix_sdk::ruma::{
3133 /// events::{
3134 /// macros::EventContent,
3135 /// room::member::{RoomMemberEventContent, MembershipState},
3136 /// },
3137 /// mxc_uri,
3138 /// };
3139 ///
3140 /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3141 /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3142 /// content.avatar_url = Some(avatar_url);
3143 ///
3144 /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3145 ///
3146 /// // Custom event:
3147 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3148 /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3149 /// struct XxxStateEventContent { /* fields... */ }
3150 ///
3151 /// let content: XxxStateEventContent = todo!();
3152 /// joined_room.send_state_event_for_key("foo", content).await?;
3153 /// # anyhow::Ok(()) };
3154 /// ```
3155 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3156 pub async fn send_state_event_for_key<C, K>(
3157 &self,
3158 state_key: &K,
3159 content: C,
3160 ) -> Result<send_state_event::v3::Response>
3161 where
3162 C: StateEventContent,
3163 C::StateKey: Borrow<K>,
3164 K: AsRef<str> + ?Sized,
3165 {
3166 self.ensure_room_joined()?;
3167 let request =
3168 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3169 let response = self.client.send(request).await?;
3170 Ok(response)
3171 }
3172
3173 /// Send a state event to the homeserver. If state encryption is enabled in
3174 /// this room, the event will be encrypted.
3175 ///
3176 /// If the experimental state event encryption feature is enabled, this
3177 /// method will transparently encrypt the event if this room is
3178 /// encrypted (except if the event type is considered critical for the room
3179 /// to function, as outlined in [MSC4362][msc4362]).
3180 ///
3181 /// Returns the parsed response from the server.
3182 ///
3183 /// # Arguments
3184 ///
3185 /// * `content` - The content of the state event.
3186 ///
3187 /// * `state_key` - A unique key which defines the overwriting semantics for
3188 /// this piece of room state.
3189 ///
3190 /// # Examples
3191 ///
3192 /// ```no_run
3193 /// # use serde::{Deserialize, Serialize};
3194 /// # async {
3195 /// # let joined_room: matrix_sdk::Room = todo!();
3196 /// use matrix_sdk::ruma::{
3197 /// events::{
3198 /// macros::EventContent,
3199 /// room::member::{RoomMemberEventContent, MembershipState},
3200 /// },
3201 /// mxc_uri,
3202 /// };
3203 ///
3204 /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3205 /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3206 /// content.avatar_url = Some(avatar_url);
3207 ///
3208 /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3209 ///
3210 /// // Custom event:
3211 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3212 /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3213 /// struct XxxStateEventContent { /* fields... */ }
3214 ///
3215 /// let content: XxxStateEventContent = todo!();
3216 /// joined_room.send_state_event_for_key("foo", content).await?;
3217 /// # anyhow::Ok(()) };
3218 /// ```
3219 ///
3220 /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3221 #[cfg(feature = "experimental-encrypted-state-events")]
3222 pub fn send_state_event_for_key<'a, C, K>(
3223 &'a self,
3224 state_key: &K,
3225 content: C,
3226 ) -> SendStateEvent<'a>
3227 where
3228 C: StateEventContent,
3229 C::StateKey: Borrow<K>,
3230 K: AsRef<str> + ?Sized,
3231 {
3232 SendStateEvent::new(self, state_key, content)
3233 }
3234
3235 /// Send a raw room state event to the homeserver.
3236 ///
3237 /// Returns the parsed response from the server.
3238 ///
3239 /// # Arguments
3240 ///
3241 /// * `event_type` - The type of the event that we're sending out.
3242 ///
3243 /// * `state_key` - A unique key which defines the overwriting semantics for
3244 /// this piece of room state. This value is often a zero-length string.
3245 ///
3246 /// * `content` - The content of the event as a raw JSON value. The argument
3247 /// type can be `serde_json::Value`, but also other raw JSON types; for
3248 /// the full list check the documentation of [`IntoRawStateEventContent`].
3249 ///
3250 /// # Examples
3251 ///
3252 /// ```no_run
3253 /// use serde_json::json;
3254 ///
3255 /// # async {
3256 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3257 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3258 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3259 ///
3260 /// if let Some(room) = client.get_room(&room_id) {
3261 /// room.send_state_event_raw("m.room.member", "", json!({
3262 /// "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3263 /// "displayname": "Alice Margatroid",
3264 /// "membership": "join",
3265 /// })).await?;
3266 /// }
3267 /// # anyhow::Ok(()) };
3268 /// ```
3269 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3270 #[instrument(skip_all)]
3271 pub async fn send_state_event_raw(
3272 &self,
3273 event_type: &str,
3274 state_key: &str,
3275 content: impl IntoRawStateEventContent,
3276 ) -> Result<send_state_event::v3::Response> {
3277 self.ensure_room_joined()?;
3278
3279 let request = send_state_event::v3::Request::new_raw(
3280 self.room_id().to_owned(),
3281 event_type.into(),
3282 state_key.to_owned(),
3283 content.into_raw_state_event_content(),
3284 );
3285
3286 Ok(self.client.send(request).await?)
3287 }
3288
3289 /// Send a raw room state event to the homeserver.
3290 ///
3291 /// If the experimental state event encryption feature is enabled, this
3292 /// method will transparently encrypt the event if this room is
3293 /// encrypted (except if the event type is considered critical for the room
3294 /// to function, as outlined in [MSC4362][msc4362]).
3295 ///
3296 /// Returns the parsed response from the server.
3297 ///
3298 /// # Arguments
3299 ///
3300 /// * `event_type` - The type of the event that we're sending out.
3301 ///
3302 /// * `state_key` - A unique key which defines the overwriting semantics for
3303 /// this piece of room state. This value is often a zero-length string.
3304 ///
3305 /// * `content` - The content of the event as a raw JSON value. The argument
3306 /// type can be `serde_json::Value`, but also other raw JSON types; for
3307 /// the full list check the documentation of [`IntoRawStateEventContent`].
3308 ///
3309 /// # Examples
3310 ///
3311 /// ```no_run
3312 /// use serde_json::json;
3313 ///
3314 /// # async {
3315 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3316 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3317 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3318 ///
3319 /// if let Some(room) = client.get_room(&room_id) {
3320 /// room.send_state_event_raw("m.room.member", "", json!({
3321 /// "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3322 /// "displayname": "Alice Margatroid",
3323 /// "membership": "join",
3324 /// })).await?;
3325 /// }
3326 /// # anyhow::Ok(()) };
3327 /// ```
3328 ///
3329 /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3330 #[cfg(feature = "experimental-encrypted-state-events")]
3331 #[instrument(skip_all)]
3332 pub fn send_state_event_raw<'a>(
3333 &'a self,
3334 event_type: &'a str,
3335 state_key: &'a str,
3336 content: impl IntoRawStateEventContent,
3337 ) -> SendRawStateEvent<'a> {
3338 SendRawStateEvent::new(self, event_type, state_key, content)
3339 }
3340
3341 /// Strips all information out of an event of the room.
3342 ///
3343 /// Returns the [`redact_event::v3::Response`] from the server.
3344 ///
3345 /// This cannot be undone. Users may redact their own events, and any user
3346 /// with a power level greater than or equal to the redact power level of
3347 /// the room may redact events there.
3348 ///
3349 /// # Arguments
3350 ///
3351 /// * `event_id` - The ID of the event to redact
3352 ///
3353 /// * `reason` - The reason for the event being redacted.
3354 ///
3355 /// * `txn_id` - A unique ID that can be attached to this event as
3356 /// its transaction ID. If not given one is created for the message.
3357 ///
3358 /// # Examples
3359 ///
3360 /// ```no_run
3361 /// use matrix_sdk::ruma::event_id;
3362 ///
3363 /// # async {
3364 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3365 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3366 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3367 /// #
3368 /// if let Some(room) = client.get_room(&room_id) {
3369 /// let event_id = event_id!("$xxxxxx:example.org");
3370 /// let reason = Some("Indecent material");
3371 /// room.redact(&event_id, reason, None).await?;
3372 /// }
3373 /// # anyhow::Ok(()) };
3374 /// ```
3375 #[instrument(skip_all)]
3376 pub async fn redact(
3377 &self,
3378 event_id: &EventId,
3379 reason: Option<&str>,
3380 txn_id: Option<OwnedTransactionId>,
3381 ) -> HttpResult<redact_event::v3::Response> {
3382 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3383 let request = assign!(
3384 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3385 { reason: reason.map(ToOwned::to_owned) }
3386 );
3387
3388 self.client.send(request).await
3389 }
3390
3391 /// Get a list of servers that should know this room.
3392 ///
3393 /// Uses the synced members of the room and the suggested [routing
3394 /// algorithm] from the Matrix spec.
3395 ///
3396 /// Returns at most three servers.
3397 ///
3398 /// [routing algorithm]: https://spec.matrix.org/v1.3/appendices/#routing
3399 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3400 let acl_ev = self
3401 .get_state_event_static::<RoomServerAclEventContent>()
3402 .await?
3403 .and_then(|ev| ev.deserialize().ok());
3404 let acl = acl_ev.as_ref().and_then(|ev| match ev {
3405 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3406 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3407 });
3408
3409 // Filter out server names that:
3410 // - Are blocked due to server ACLs
3411 // - Are IP addresses
3412 let members: Vec<_> = self
3413 .members_no_sync(RoomMemberships::JOIN)
3414 .await?
3415 .into_iter()
3416 .filter(|member| {
3417 let server = member.user_id().server_name();
3418 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3419 })
3420 .collect();
3421
3422 // Get the server of the highest power level user in the room, provided
3423 // they are at least power level 50.
3424 let max = members
3425 .iter()
3426 .max_by_key(|member| member.power_level())
3427 .filter(|max| max.power_level() >= int!(50))
3428 .map(|member| member.user_id().server_name());
3429
3430 // Sort the servers by population.
3431 let servers = members
3432 .iter()
3433 .map(|member| member.user_id().server_name())
3434 .filter(|server| max.filter(|max| max == server).is_none())
3435 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3436 *servers.entry(server).or_default() += 1;
3437 servers
3438 });
3439 let mut servers: Vec<_> = servers.into_iter().collect();
3440 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3441
3442 Ok(max
3443 .into_iter()
3444 .chain(servers.into_iter().map(|(name, _)| name))
3445 .take(3)
3446 .map(ToOwned::to_owned)
3447 .collect())
3448 }
3449
3450 /// Get a `matrix.to` permalink to this room.
3451 ///
3452 /// If this room has an alias, we use it. Otherwise, we try to use the
3453 /// synced members in the room for [routing] the room ID.
3454 ///
3455 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3456 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3457 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3458 return Ok(alias.matrix_to_uri());
3459 }
3460
3461 let via = self.route().await?;
3462 Ok(self.room_id().matrix_to_uri_via(via))
3463 }
3464
3465 /// Get a `matrix:` permalink to this room.
3466 ///
3467 /// If this room has an alias, we use it. Otherwise, we try to use the
3468 /// synced members in the room for [routing] the room ID.
3469 ///
3470 /// # Arguments
3471 ///
3472 /// * `join` - Whether the user should join the room.
3473 ///
3474 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3475 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3476 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3477 return Ok(alias.matrix_uri(join));
3478 }
3479
3480 let via = self.route().await?;
3481 Ok(self.room_id().matrix_uri_via(via, join))
3482 }
3483
3484 /// Get a `matrix.to` permalink to an event in this room.
3485 ///
3486 /// We try to use the synced members in the room for [routing] the room ID.
3487 ///
3488 /// *Note*: This method does not check if the given event ID is actually
3489 /// part of this room. It needs to be checked before calling this method
3490 /// otherwise the permalink won't work.
3491 ///
3492 /// # Arguments
3493 ///
3494 /// * `event_id` - The ID of the event.
3495 ///
3496 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3497 pub async fn matrix_to_event_permalink(
3498 &self,
3499 event_id: impl Into<OwnedEventId>,
3500 ) -> Result<MatrixToUri> {
3501 // Don't use the alias because an event is tied to a room ID, but an
3502 // alias might point to another room, e.g. after a room upgrade.
3503 let via = self.route().await?;
3504 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3505 }
3506
3507 /// Get a `matrix:` permalink to an event in this room.
3508 ///
3509 /// We try to use the synced members in the room for [routing] the room ID.
3510 ///
3511 /// *Note*: This method does not check if the given event ID is actually
3512 /// part of this room. It needs to be checked before calling this method
3513 /// otherwise the permalink won't work.
3514 ///
3515 /// # Arguments
3516 ///
3517 /// * `event_id` - The ID of the event.
3518 ///
3519 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3520 pub async fn matrix_event_permalink(
3521 &self,
3522 event_id: impl Into<OwnedEventId>,
3523 ) -> Result<MatrixUri> {
3524 // Don't use the alias because an event is tied to a room ID, but an
3525 // alias might point to another room, e.g. after a room upgrade.
3526 let via = self.route().await?;
3527 Ok(self.room_id().matrix_event_uri_via(event_id, via))
3528 }
3529
3530 /// Get the latest receipt of a user in this room.
3531 ///
3532 /// # Arguments
3533 ///
3534 /// * `receipt_type` - The type of receipt to get.
3535 ///
3536 /// * `thread` - The thread containing the event of the receipt, if any.
3537 ///
3538 /// * `user_id` - The ID of the user.
3539 ///
3540 /// Returns the ID of the event on which the receipt applies and the
3541 /// receipt.
3542 pub async fn load_user_receipt(
3543 &self,
3544 receipt_type: ReceiptType,
3545 thread: ReceiptThread,
3546 user_id: &UserId,
3547 ) -> Result<Option<(OwnedEventId, Receipt)>> {
3548 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3549 }
3550
3551 /// Load the receipts for an event in this room from storage.
3552 ///
3553 /// # Arguments
3554 ///
3555 /// * `receipt_type` - The type of receipt to get.
3556 ///
3557 /// * `thread` - The thread containing the event of the receipt, if any.
3558 ///
3559 /// * `event_id` - The ID of the event.
3560 ///
3561 /// Returns a list of IDs of users who have sent a receipt for the event and
3562 /// the corresponding receipts.
3563 pub async fn load_event_receipts(
3564 &self,
3565 receipt_type: ReceiptType,
3566 thread: ReceiptThread,
3567 event_id: &EventId,
3568 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3569 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3570 }
3571
3572 /// Get the push-condition context for this room.
3573 ///
3574 /// Returns `None` if some data couldn't be found. This should only happen
3575 /// in brand new rooms, while we process its state.
3576 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3577 self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions().await?)
3578 .await
3579 }
3580
3581 /// Get the push-condition context for this room, with a choice to include
3582 /// thread subscriptions or not, based on the extra
3583 /// `with_threads_subscriptions` parameter.
3584 ///
3585 /// Returns `None` if some data couldn't be found. This should only happen
3586 /// in brand new rooms, while we process its state.
3587 pub(crate) async fn push_condition_room_ctx_internal(
3588 &self,
3589 with_threads_subscriptions: bool,
3590 ) -> Result<Option<PushConditionRoomCtx>> {
3591 let room_id = self.room_id();
3592 let user_id = self.own_user_id();
3593 let room_info = self.clone_info();
3594 let member_count = room_info.active_members_count();
3595
3596 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3597 member.name().to_owned()
3598 } else {
3599 return Ok(None);
3600 };
3601
3602 let power_levels = match self.power_levels().await {
3603 Ok(power_levels) => Some(power_levels.into()),
3604 Err(error) => {
3605 if matches!(room_info.state(), RoomState::Joined) {
3606 // It's normal to not have the power levels in a non-joined room, so don't log
3607 // the error if the room is not joined
3608 error!("Could not compute power levels for push conditions: {error}");
3609 }
3610 None
3611 }
3612 };
3613
3614 let mut ctx = assign!(PushConditionRoomCtx::new(
3615 room_id.to_owned(),
3616 UInt::new(member_count).unwrap_or(UInt::MAX),
3617 user_id.to_owned(),
3618 user_display_name,
3619 ),
3620 {
3621 power_levels,
3622 });
3623
3624 if with_threads_subscriptions {
3625 let this = self.clone();
3626 ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3627 let room = this.clone();
3628 Box::pin(async move {
3629 if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3630 maybe_sub.is_some()
3631 } else {
3632 false
3633 }
3634 })
3635 });
3636 }
3637
3638 Ok(Some(ctx))
3639 }
3640
3641 /// Retrieves a [`PushContext`] that can be used to compute the push
3642 /// actions for events.
3643 pub async fn push_context(&self) -> Result<Option<PushContext>> {
3644 self.push_context_internal(self.client.enabled_thread_subscriptions().await?).await
3645 }
3646
3647 /// Retrieves a [`PushContext`] that can be used to compute the push actions
3648 /// for events, with a choice to include thread subscriptions or not,
3649 /// based on the extra `with_threads_subscriptions` parameter.
3650 #[instrument(skip(self))]
3651 pub(crate) async fn push_context_internal(
3652 &self,
3653 with_threads_subscriptions: bool,
3654 ) -> Result<Option<PushContext>> {
3655 let Some(push_condition_room_ctx) =
3656 self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3657 else {
3658 debug!("Could not aggregate push context");
3659 return Ok(None);
3660 };
3661 let push_rules = self.client().account().push_rules().await?;
3662 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3663 }
3664
3665 /// Get the push actions for the given event with the current room state.
3666 ///
3667 /// Note that it is possible that no push action is returned because the
3668 /// current room state does not have all the required state events.
3669 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3670 if let Some(ctx) = self.push_context().await? {
3671 Ok(Some(ctx.for_event(event).await))
3672 } else {
3673 Ok(None)
3674 }
3675 }
3676
3677 /// The membership details of the (latest) invite for the logged-in user in
3678 /// this room.
3679 pub async fn invite_details(&self) -> Result<Invite> {
3680 let state = self.state();
3681
3682 if state != RoomState::Invited {
3683 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3684 }
3685
3686 let invitee = self
3687 .get_member_no_sync(self.own_user_id())
3688 .await?
3689 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3690 let event = invitee.event();
3691
3692 let inviter_id = event.sender().to_owned();
3693 let inviter = self.get_member_no_sync(&inviter_id).await?;
3694
3695 Ok(Invite { invitee, inviter_id, inviter })
3696 }
3697
3698 /// Get the membership details for the current user.
3699 ///
3700 /// Returns:
3701 /// - If the user was present in the room, a
3702 /// [`RoomMemberWithSenderInfo`] containing both the user info and the
3703 /// member info of the sender of the `m.room.member` event.
3704 /// - If the current user is not present, an error.
3705 pub async fn member_with_sender_info(
3706 &self,
3707 user_id: &UserId,
3708 ) -> Result<RoomMemberWithSenderInfo> {
3709 let Some(member) = self.get_member_no_sync(user_id).await? else {
3710 return Err(Error::InsufficientData);
3711 };
3712
3713 let sender_member =
3714 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3715 // If the sender room member info is already available, return it
3716 Some(member)
3717 } else if self.are_members_synced() {
3718 // The room members are synced and we couldn't find the sender info
3719 None
3720 } else if self.sync_members().await.is_ok() {
3721 // Try getting the sender room member info again after syncing
3722 self.get_member_no_sync(member.event().sender()).await?
3723 } else {
3724 None
3725 };
3726
3727 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3728 }
3729
3730 /// Forget this room.
3731 ///
3732 /// This communicates to the homeserver that it should forget the room.
3733 ///
3734 /// Only left or banned-from rooms can be forgotten.
3735 pub async fn forget(&self) -> Result<()> {
3736 let state = self.state();
3737 match state {
3738 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3739 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3740 "Left / Banned",
3741 state,
3742 ))));
3743 }
3744 RoomState::Left | RoomState::Banned => {}
3745 }
3746
3747 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3748 let _response = self.client.send(request).await?;
3749
3750 // If it was a DM, remove the room from the `m.direct` global account data.
3751 if self.inner.direct_targets_length() != 0
3752 && let Err(e) = self.set_is_direct(false).await
3753 {
3754 // It is not important whether we managed to remove the room, it will not have
3755 // any consequences, so just log the error.
3756 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3757 }
3758
3759 self.client.base_client().forget_room(self.inner.room_id()).await?;
3760
3761 Ok(())
3762 }
3763
3764 fn ensure_room_joined(&self) -> Result<()> {
3765 let state = self.state();
3766 if state == RoomState::Joined {
3767 Ok(())
3768 } else {
3769 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3770 }
3771 }
3772
3773 /// Get the notification mode.
3774 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3775 if !matches!(self.state(), RoomState::Joined) {
3776 return None;
3777 }
3778
3779 let notification_settings = self.client().notification_settings().await;
3780
3781 // Get the user-defined mode if available
3782 let notification_mode =
3783 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3784
3785 if notification_mode.is_some() {
3786 notification_mode
3787 } else if let Ok(is_encrypted) =
3788 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3789 {
3790 // Otherwise, if encrypted status is available, get the default mode for this
3791 // type of room.
3792 // From the point of view of notification settings, a `one-to-one` room is one
3793 // that involves exactly two people.
3794 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3795 let default_mode = notification_settings
3796 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3797 .await;
3798 Some(default_mode)
3799 } else {
3800 None
3801 }
3802 }
3803
3804 /// Get the user-defined notification mode.
3805 ///
3806 /// The result is cached for fast and non-async call. To read the cached
3807 /// result, use
3808 /// [`matrix_sdk_base::Room::cached_user_defined_notification_mode`].
3809 //
3810 // Note for maintainers:
3811 //
3812 // The fact the result is cached is an important property. If you change that in
3813 // the future, please review all calls to this method.
3814 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3815 if !matches!(self.state(), RoomState::Joined) {
3816 return None;
3817 }
3818
3819 let notification_settings = self.client().notification_settings().await;
3820
3821 // Get the user-defined mode if available.
3822 let mode =
3823 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3824
3825 if let Some(mode) = mode {
3826 self.update_cached_user_defined_notification_mode(mode);
3827 }
3828
3829 mode
3830 }
3831
3832 /// Report an event as inappropriate to the homeserver's administrator.
3833 ///
3834 /// # Arguments
3835 ///
3836 /// * `event_id` - The ID of the event to report.
3837 /// * `score` - The score to rate this content.
3838 /// * `reason` - The reason the content is being reported.
3839 ///
3840 /// # Errors
3841 ///
3842 /// Returns an error if the room is not joined or if an error occurs with
3843 /// the request.
3844 pub async fn report_content(
3845 &self,
3846 event_id: OwnedEventId,
3847 reason: Option<String>,
3848 ) -> Result<report_content::v3::Response> {
3849 let state = self.state();
3850 if state != RoomState::Joined {
3851 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3852 }
3853
3854 let request = assign!(
3855 report_content::v3::Request::new(
3856 self.inner.room_id().to_owned(),
3857 event_id,
3858 ), {
3859 reason: reason
3860 }
3861 );
3862 Ok(self.client.send(request).await?)
3863 }
3864
3865 /// Reports a room as inappropriate to the server.
3866 /// The caller is not required to be joined to the room to report it.
3867 ///
3868 /// # Arguments
3869 ///
3870 /// * `reason` - The reason the room is being reported.
3871 ///
3872 /// # Errors
3873 ///
3874 /// Returns an error if the room is not found or on rate limit
3875 pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3876 let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3877
3878 Ok(self.client.send(request).await?)
3879 }
3880
3881 /// Set a flag on the room to indicate that the user has explicitly marked
3882 /// it as (un)read.
3883 ///
3884 /// This is a no-op if [`BaseRoom::is_marked_unread()`] returns the same
3885 /// value as `unread`.
3886 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3887 if self.is_marked_unread() == unread {
3888 // The request is not necessary.
3889 return Ok(());
3890 }
3891
3892 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3893
3894 let content = MarkedUnreadEventContent::new(unread);
3895
3896 let request = set_room_account_data::v3::Request::new(
3897 user_id.to_owned(),
3898 self.inner.room_id().to_owned(),
3899 &content,
3900 )?;
3901
3902 self.client.send(request).await?;
3903 Ok(())
3904 }
3905
3906 /// Returns the [`RoomEventCache`] associated to this room, assuming the
3907 /// global [`EventCache`] has been enabled for subscription.
3908 pub async fn event_cache(
3909 &self,
3910 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3911 self.client.event_cache().for_room(self.room_id()).await
3912 }
3913
3914 /// Get the beacon information event in the room for the `user_id`.
3915 ///
3916 /// # Errors
3917 ///
3918 /// Returns an error if the event is redacted, stripped, not found or could
3919 /// not be deserialized.
3920 pub(crate) async fn get_user_beacon_info(
3921 &self,
3922 user_id: &UserId,
3923 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3924 let raw_event = self
3925 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3926 .await?
3927 .ok_or(BeaconError::NotFound)?;
3928
3929 match raw_event.deserialize()? {
3930 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3931 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3932 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3933 }
3934 }
3935
3936 /// Start sharing live location in the room.
3937 ///
3938 /// # Arguments
3939 ///
3940 /// * `duration_millis` - The duration for which the live location is
3941 /// shared, in milliseconds.
3942 /// * `description` - An optional description for the live location share.
3943 ///
3944 /// # Errors
3945 ///
3946 /// Returns an error if the room is not joined or if the state event could
3947 /// not be sent.
3948 pub async fn start_live_location_share(
3949 &self,
3950 duration_millis: u64,
3951 description: Option<String>,
3952 ) -> Result<send_state_event::v3::Response> {
3953 self.ensure_room_joined()?;
3954
3955 self.send_state_event_for_key(
3956 self.own_user_id(),
3957 BeaconInfoEventContent::new(
3958 description,
3959 Duration::from_millis(duration_millis),
3960 true,
3961 None,
3962 ),
3963 )
3964 .await
3965 }
3966
3967 /// Stop sharing live location in the room.
3968 ///
3969 /// # Errors
3970 ///
3971 /// Returns an error if the room is not joined, if the beacon information
3972 /// is redacted or stripped, or if the state event is not found.
3973 pub async fn stop_live_location_share(
3974 &self,
3975 ) -> Result<send_state_event::v3::Response, BeaconError> {
3976 self.ensure_room_joined()?;
3977
3978 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3979 beacon_info_event.content.stop();
3980 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3981 }
3982
3983 /// Send a location beacon event in the current room.
3984 ///
3985 /// # Arguments
3986 ///
3987 /// * `geo_uri` - The geo URI of the location beacon.
3988 ///
3989 /// # Errors
3990 ///
3991 /// Returns an error if the room is not joined, if the beacon information
3992 /// is redacted or stripped, if the location share is no longer live,
3993 /// or if the state event is not found.
3994 pub async fn send_location_beacon(
3995 &self,
3996 geo_uri: String,
3997 ) -> Result<send_message_event::v3::Response, BeaconError> {
3998 self.ensure_room_joined()?;
3999
4000 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
4001
4002 if beacon_info_event.content.is_live() {
4003 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
4004 Ok(self.send(content).await?.response)
4005 } else {
4006 Err(BeaconError::NotLive)
4007 }
4008 }
4009
4010 /// Store the given `ComposerDraft` in the state store using the current
4011 /// room id and optional thread root id as identifier.
4012 pub async fn save_composer_draft(
4013 &self,
4014 draft: ComposerDraft,
4015 thread_root: Option<&EventId>,
4016 ) -> Result<()> {
4017 self.client
4018 .state_store()
4019 .set_kv_data(
4020 StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
4021 StateStoreDataValue::ComposerDraft(draft),
4022 )
4023 .await?;
4024 Ok(())
4025 }
4026
4027 /// Retrieve the `ComposerDraft` stored in the state store for this room
4028 /// and given thread, if any.
4029 pub async fn load_composer_draft(
4030 &self,
4031 thread_root: Option<&EventId>,
4032 ) -> Result<Option<ComposerDraft>> {
4033 let data = self
4034 .client
4035 .state_store()
4036 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4037 .await?;
4038 Ok(data.and_then(|d| d.into_composer_draft()))
4039 }
4040
4041 /// Remove the `ComposerDraft` stored in the state store for this room
4042 /// and given thread, if any.
4043 pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
4044 self.client
4045 .state_store()
4046 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4047 .await?;
4048 Ok(())
4049 }
4050
4051 /// Load pinned state events for a room from the `/state` endpoint in the
4052 /// home server.
4053 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
4054 let response = self
4055 .client
4056 .send(get_state_event_for_key::v3::Request::new(
4057 self.room_id().to_owned(),
4058 StateEventType::RoomPinnedEvents,
4059 "".to_owned(),
4060 ))
4061 .await;
4062
4063 match response {
4064 Ok(response) => Ok(Some(
4065 response
4066 .into_content()
4067 .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
4068 .pinned,
4069 )),
4070 Err(http_error) => match http_error.as_client_api_error() {
4071 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
4072 _ => Err(http_error.into()),
4073 },
4074 }
4075 }
4076
4077 /// Observe live location sharing events for this room.
4078 ///
4079 /// The returned observable will receive the newest event for each sync
4080 /// response that contains an `m.beacon` event.
4081 ///
4082 /// Returns a stream of [`ObservableLiveLocation`] events from other users
4083 /// in the room, excluding the live location events of the room's own user.
4084 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
4085 ObservableLiveLocation::new(&self.client, self.room_id())
4086 }
4087
4088 /// Subscribe to knock requests in this `Room`.
4089 ///
4090 /// The current requests to join the room will be emitted immediately
4091 /// when subscribing.
4092 ///
4093 /// A new set of knock requests will be emitted whenever:
4094 /// - A new member event is received.
4095 /// - A knock request is marked as seen.
4096 /// - A sync is gappy (limited), so room membership information may be
4097 /// outdated.
4098 ///
4099 /// Returns both a stream of knock requests and a handle for a task that
4100 /// will clean up the seen knock request ids when possible.
4101 pub async fn subscribe_to_knock_requests(
4102 &self,
4103 ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
4104 let this = Arc::new(self.clone());
4105
4106 let room_member_events_observer =
4107 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
4108
4109 let current_seen_ids = self.get_seen_knock_request_ids().await?;
4110 let mut seen_request_ids_stream = self
4111 .seen_knock_request_ids_map
4112 .subscribe()
4113 .await
4114 .map(|values| values.unwrap_or_default());
4115
4116 let mut room_info_stream = self.subscribe_info();
4117
4118 // Spawn a task that will clean up the seen knock request ids when updated room
4119 // members are received
4120 let clear_seen_ids_handle = spawn({
4121 let this = self.clone();
4122 async move {
4123 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
4124 while member_updates_stream.recv().await.is_ok() {
4125 // If room members were updated, try to remove outdated seen knock request ids
4126 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
4127 warn!("Failed to remove seen knock requests: {err}")
4128 }
4129 }
4130 }
4131 });
4132
4133 let combined_stream = stream! {
4134 // Emit current requests to join
4135 match this.get_current_join_requests(¤t_seen_ids).await {
4136 Ok(initial_requests) => yield initial_requests,
4137 Err(err) => warn!("Failed to get initial requests to join: {err}")
4138 }
4139
4140 let mut requests_stream = room_member_events_observer.subscribe();
4141 let mut seen_ids = current_seen_ids.clone();
4142
4143 loop {
4144 // This is equivalent to a combine stream operation, triggering a new emission
4145 // when any of the branches changes
4146 tokio::select! {
4147 Some((event, _)) = requests_stream.next() => {
4148 if let Some(event) = event.as_original() {
4149 // If we can calculate the membership change, try to emit only when needed
4150 let emit = if event.prev_content().is_some() {
4151 matches!(event.membership_change(),
4152 MembershipChange::Banned |
4153 MembershipChange::Knocked |
4154 MembershipChange::KnockAccepted |
4155 MembershipChange::KnockDenied |
4156 MembershipChange::KnockRetracted
4157 )
4158 } else {
4159 // If we can't calculate the membership change, assume we need to
4160 // emit updated values
4161 true
4162 };
4163
4164 if emit {
4165 match this.get_current_join_requests(&seen_ids).await {
4166 Ok(requests) => yield requests,
4167 Err(err) => {
4168 warn!("Failed to get updated knock requests on new member event: {err}")
4169 }
4170 }
4171 }
4172 }
4173 }
4174
4175 Some(new_seen_ids) = seen_request_ids_stream.next() => {
4176 // Update the current seen ids
4177 seen_ids = new_seen_ids;
4178
4179 // If seen requests have changed we need to recalculate
4180 // all the knock requests
4181 match this.get_current_join_requests(&seen_ids).await {
4182 Ok(requests) => yield requests,
4183 Err(err) => {
4184 warn!("Failed to get updated knock requests on seen ids changed: {err}")
4185 }
4186 }
4187 }
4188
4189 Some(room_info) = room_info_stream.next() => {
4190 // We need to emit new items when we may have missing room members:
4191 // this usually happens after a gappy (limited) sync
4192 if !room_info.are_members_synced() {
4193 match this.get_current_join_requests(&seen_ids).await {
4194 Ok(requests) => yield requests,
4195 Err(err) => {
4196 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4197 }
4198 }
4199 }
4200 }
4201 // If the streams in all branches are closed, stop the loop
4202 else => break,
4203 }
4204 }
4205 };
4206
4207 Ok((combined_stream, clear_seen_ids_handle))
4208 }
4209
4210 async fn get_current_join_requests(
4211 &self,
4212 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4213 ) -> Result<Vec<KnockRequest>> {
4214 Ok(self
4215 .members(RoomMemberships::KNOCK)
4216 .await?
4217 .into_iter()
4218 .filter_map(|member| {
4219 let event_id = member.event().event_id()?;
4220 Some(KnockRequest::new(
4221 self,
4222 event_id,
4223 member.event().timestamp(),
4224 KnockRequestMemberInfo::from_member(&member),
4225 seen_request_ids.contains_key(event_id),
4226 ))
4227 })
4228 .collect())
4229 }
4230
4231 /// Access the room settings related to privacy and visibility.
4232 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4233 RoomPrivacySettings::new(&self.inner, &self.client)
4234 }
4235
4236 /// Retrieve a list of all the threads for the current room.
4237 ///
4238 /// Since this client-server API is paginated, the return type may include a
4239 /// token used to resuming back-pagination into the list of results, in
4240 /// [`ThreadRoots::prev_batch_token`]. This token can be fed back into
4241 /// [`ListThreadsOptions::from`] to continue the pagination
4242 /// from the previous position.
4243 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4244 let request = opts.into_request(self.room_id());
4245
4246 let response = self.client.send(request).await?;
4247
4248 let push_ctx = self.push_context().await?;
4249 let chunk = join_all(
4250 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4251 )
4252 .await;
4253
4254 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4255 }
4256
4257 /// Retrieve a list of relations for the given event, according to the given
4258 /// options, using the network.
4259 ///
4260 /// Since this client-server API is paginated, the return type may include a
4261 /// token used to resuming back-pagination into the list of results, in
4262 /// [`Relations::prev_batch_token`]. This token can be fed back into
4263 /// [`RelationsOptions::from`] to continue the pagination from the previous
4264 /// position.
4265 ///
4266 /// **Note**: if [`RelationsOptions::from`] is set for a subsequent request,
4267 /// then it must be used with the same
4268 /// [`RelationsOptions::include_relations`] value as the request that
4269 /// returns the `from` token, otherwise the server behavior is undefined.
4270 pub async fn relations(
4271 &self,
4272 event_id: OwnedEventId,
4273 opts: RelationsOptions,
4274 ) -> Result<Relations> {
4275 let relations = opts.send(self, event_id).await;
4276
4277 // Save any new related events to the cache.
4278 if let Ok(Relations { chunk, .. }) = &relations
4279 && let Ok((cache, _handles)) = self.event_cache().await
4280 {
4281 cache.save_events(chunk.clone()).await;
4282 }
4283
4284 relations
4285 }
4286
4287 /// Search this room's [`RoomIndex`] for query and return at most
4288 /// max_number_of_results results.
4289 #[cfg(feature = "experimental-search")]
4290 pub async fn search(
4291 &self,
4292 query: &str,
4293 max_number_of_results: usize,
4294 pagination_offset: Option<usize>,
4295 ) -> Result<Vec<OwnedEventId>, IndexError> {
4296 let mut search_index_guard = self.client.search_index().lock().await;
4297 search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4298 }
4299
4300 /// Subscribe to a given thread in this room.
4301 ///
4302 /// This will subscribe the user to the thread, so that they will receive
4303 /// notifications for that thread specifically.
4304 ///
4305 /// # Arguments
4306 ///
4307 /// - `thread_root`: The ID of the thread root event to subscribe to.
4308 /// - `automatic`: Whether the subscription was made automatically by a
4309 /// client, not by manual user choice. If set, must include the latest
4310 /// event ID that's known in the thread and that is causing the automatic
4311 /// subscription. If unset (i.e. we're now subscribing manually) and there
4312 /// was a previous automatic subscription, the subscription will be
4313 /// overridden to a manual one instead.
4314 ///
4315 /// # Returns
4316 ///
4317 /// - A 404 error if the event isn't known, or isn't a thread root.
4318 /// - An `Ok` result if the subscription was successful, or if the server
4319 /// skipped an automatic subscription (as the user unsubscribed from the
4320 /// thread after the event causing the automatic subscription).
4321 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4322 pub async fn subscribe_thread(
4323 &self,
4324 thread_root: OwnedEventId,
4325 automatic: Option<OwnedEventId>,
4326 ) -> Result<()> {
4327 let is_automatic = automatic.is_some();
4328
4329 match self
4330 .client
4331 .send(subscribe_thread::unstable::Request::new(
4332 self.room_id().to_owned(),
4333 thread_root.clone(),
4334 automatic,
4335 ))
4336 .await
4337 {
4338 Ok(_response) => {
4339 trace!("Server acknowledged the thread subscription; saving in db");
4340
4341 // Immediately save the result into the database.
4342 self.client
4343 .state_store()
4344 .upsert_thread_subscriptions(vec![(
4345 self.room_id(),
4346 &thread_root,
4347 StoredThreadSubscription {
4348 status: ThreadSubscriptionStatus::Subscribed {
4349 automatic: is_automatic,
4350 },
4351 bump_stamp: None,
4352 },
4353 )])
4354 .await?;
4355
4356 Ok(())
4357 }
4358
4359 Err(err) => {
4360 if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4361 // In this case: the server indicates that the user unsubscribed *after* the
4362 // event ID we've used in an automatic subscription; don't
4363 // save the subscription state in the database, as the
4364 // previous one should be more correct.
4365 trace!("Thread subscription skipped: {err}");
4366 Ok(())
4367 } else {
4368 // Forward the error to the caller.
4369 Err(err.into())
4370 }
4371 }
4372 }
4373 }
4374
4375 /// Subscribe to a thread if needed, based on a current subscription to it.
4376 ///
4377 /// This is like [`Self::subscribe_thread`], but it first checks if the user
4378 /// has already subscribed to a thread, so as to minimize sending
4379 /// unnecessary subscriptions which would be ignored by the server.
4380 pub async fn subscribe_thread_if_needed(
4381 &self,
4382 thread_root: &EventId,
4383 automatic: Option<OwnedEventId>,
4384 ) -> Result<()> {
4385 if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4386 // If we have a previous subscription, we should only send the new one if it's
4387 // manual and the previous one was automatic.
4388 if !prev_sub.automatic || automatic.is_some() {
4389 // Either we had already a manual subscription, or we had an automatic one and
4390 // the new one is automatic too: nothing to do!
4391 return Ok(());
4392 }
4393 }
4394 self.subscribe_thread(thread_root.to_owned(), automatic).await
4395 }
4396
4397 /// Unsubscribe from a given thread in this room.
4398 ///
4399 /// # Arguments
4400 ///
4401 /// - `thread_root`: The ID of the thread root event to unsubscribe to.
4402 ///
4403 /// # Returns
4404 ///
4405 /// - An `Ok` result if the unsubscription was successful, or the thread was
4406 /// already unsubscribed.
4407 /// - A 404 error if the event isn't known, or isn't a thread root.
4408 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4409 pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4410 self.client
4411 .send(unsubscribe_thread::unstable::Request::new(
4412 self.room_id().to_owned(),
4413 thread_root.clone(),
4414 ))
4415 .await?;
4416
4417 trace!("Server acknowledged the thread subscription removal; removed it from db too");
4418
4419 // Immediately save the result into the database.
4420 self.client
4421 .state_store()
4422 .upsert_thread_subscriptions(vec![(
4423 self.room_id(),
4424 &thread_root,
4425 StoredThreadSubscription {
4426 status: ThreadSubscriptionStatus::Unsubscribed,
4427 bump_stamp: None,
4428 },
4429 )])
4430 .await?;
4431
4432 Ok(())
4433 }
4434
4435 /// Return the current thread subscription for the given thread root in this
4436 /// room.
4437 ///
4438 /// # Arguments
4439 ///
4440 /// - `thread_root`: The ID of the thread root event to get the subscription
4441 /// for.
4442 ///
4443 /// # Returns
4444 ///
4445 /// - An `Ok` result with `Some(ThreadSubscription)` if we have some
4446 /// subscription information.
4447 /// - An `Ok` result with `None` if the subscription does not exist, or the
4448 /// event couldn't be found, or the event isn't a thread.
4449 /// - An error if the request fails for any other reason, such as a network
4450 /// error.
4451 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4452 pub async fn fetch_thread_subscription(
4453 &self,
4454 thread_root: OwnedEventId,
4455 ) -> Result<Option<ThreadSubscription>> {
4456 let result = self
4457 .client
4458 .send(get_thread_subscription::unstable::Request::new(
4459 self.room_id().to_owned(),
4460 thread_root.clone(),
4461 ))
4462 .await;
4463
4464 let subscription = match result {
4465 Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4466 Err(http_error) => match http_error.as_client_api_error() {
4467 Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4468 _ => return Err(http_error.into()),
4469 },
4470 };
4471
4472 // Keep the database in sync.
4473 if let Some(sub) = &subscription {
4474 self.client
4475 .state_store()
4476 .upsert_thread_subscriptions(vec![(
4477 self.room_id(),
4478 &thread_root,
4479 StoredThreadSubscription {
4480 status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4481 bump_stamp: None,
4482 },
4483 )])
4484 .await?;
4485 } else {
4486 // If the subscription was not found, remove it from the database.
4487 self.client
4488 .state_store()
4489 .remove_thread_subscription(self.room_id(), &thread_root)
4490 .await?;
4491 }
4492
4493 Ok(subscription)
4494 }
4495
4496 /// Return the current thread subscription for the given thread root in this
4497 /// room, by getting it from storage if possible, or fetching it from
4498 /// network otherwise.
4499 ///
4500 /// See also [`Self::fetch_thread_subscription`] for the exact semantics of
4501 /// this method.
4502 pub async fn load_or_fetch_thread_subscription(
4503 &self,
4504 thread_root: &EventId,
4505 ) -> Result<Option<ThreadSubscription>> {
4506 // If the thread subscriptions list is outdated, fetch from the server.
4507 if self.client.thread_subscription_catchup().is_outdated() {
4508 return self.fetch_thread_subscription(thread_root.to_owned()).await;
4509 }
4510
4511 // Otherwise, we can rely on the store information.
4512 Ok(self
4513 .client
4514 .state_store()
4515 .load_thread_subscription(self.room_id(), thread_root)
4516 .await
4517 .map(|maybe_sub| {
4518 maybe_sub.and_then(|stored| match stored.status {
4519 ThreadSubscriptionStatus::Unsubscribed => None,
4520 ThreadSubscriptionStatus::Subscribed { automatic } => {
4521 Some(ThreadSubscription { automatic })
4522 }
4523 })
4524 })?)
4525 }
4526
4527 /// Adds a new pinned event by sending an updated `m.room.pinned_events`
4528 /// event containing the new event id.
4529 ///
4530 /// This method will first try to get the pinned events from the current
4531 /// room's state and if it fails to do so it'll try to load them from the
4532 /// homeserver.
4533 ///
4534 /// Returns `true` if we pinned the event, `false` if the event was already
4535 /// pinned.
4536 pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
4537 let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4538 event_ids
4539 } else {
4540 self.load_pinned_events().await?.unwrap_or_default()
4541 };
4542 let event_id = event_id.to_owned();
4543 if pinned_event_ids.contains(&event_id) {
4544 Ok(false)
4545 } else {
4546 pinned_event_ids.push(event_id);
4547 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4548 self.send_state_event(content).await?;
4549 Ok(true)
4550 }
4551 }
4552
4553 /// Removes a pinned event by sending an updated `m.room.pinned_events`
4554 /// event without the event id we want to remove.
4555 ///
4556 /// This method will first try to get the pinned events from the current
4557 /// room's state and if it fails to do so it'll try to load them from the
4558 /// homeserver.
4559 ///
4560 /// Returns `true` if we unpinned the event, `false` if the event wasn't
4561 /// pinned before.
4562 pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
4563 let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4564 event_ids
4565 } else {
4566 self.load_pinned_events().await?.unwrap_or_default()
4567 };
4568 let event_id = event_id.to_owned();
4569 if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
4570 pinned_event_ids.remove(idx);
4571 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4572 self.send_state_event(content).await?;
4573 Ok(true)
4574 } else {
4575 Ok(false)
4576 }
4577 }
4578}
4579
4580#[cfg(feature = "e2e-encryption")]
4581impl RoomIdentityProvider for Room {
4582 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4583 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4584 }
4585
4586 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4587 Box::pin(async {
4588 let members = self
4589 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4590 .await
4591 .unwrap_or_else(|_| Default::default());
4592
4593 let mut ret: Vec<UserIdentity> = Vec::new();
4594 for member in members {
4595 if let Some(i) = self.user_identity(member.user_id()).await {
4596 ret.push(i);
4597 }
4598 }
4599 ret
4600 })
4601 }
4602
4603 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4604 Box::pin(async {
4605 self.client
4606 .encryption()
4607 .get_user_identity(user_id)
4608 .await
4609 .unwrap_or(None)
4610 .map(|u| u.underlying_identity())
4611 })
4612 }
4613}
4614
4615/// A wrapper for a weak client and a room id that allows to lazily retrieve a
4616/// room, only when needed.
4617#[derive(Clone, Debug)]
4618pub(crate) struct WeakRoom {
4619 client: WeakClient,
4620 room_id: OwnedRoomId,
4621}
4622
4623impl WeakRoom {
4624 /// Create a new `WeakRoom` given its weak components.
4625 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4626 Self { client, room_id }
4627 }
4628
4629 /// Attempts to reconstruct the room.
4630 pub fn get(&self) -> Option<Room> {
4631 self.client.get().and_then(|client| client.get_room(&self.room_id))
4632 }
4633
4634 /// The room id for that room.
4635 pub fn room_id(&self) -> &RoomId {
4636 &self.room_id
4637 }
4638}
4639
4640/// Details of the (latest) invite.
4641#[derive(Debug, Clone)]
4642pub struct Invite {
4643 /// Who has been invited.
4644 pub invitee: RoomMember,
4645
4646 /// The user ID of who sent the invite.
4647 ///
4648 /// This is useful if `Self::inviter` is `None`.
4649 pub inviter_id: OwnedUserId,
4650
4651 /// Who sent the invite.
4652 ///
4653 /// If `None`, check `Self::inviter_id`, it might be useful as a fallback.
4654 pub inviter: Option<RoomMember>,
4655}
4656
4657#[derive(Error, Debug)]
4658enum InvitationError {
4659 #[error("No membership event found")]
4660 EventMissing,
4661}
4662
4663/// Receipts to send all at once.
4664#[derive(Debug, Clone, Default)]
4665#[non_exhaustive]
4666pub struct Receipts {
4667 /// Fully-read marker (room account data).
4668 pub fully_read: Option<OwnedEventId>,
4669 /// Read receipt (public ephemeral room event).
4670 pub public_read_receipt: Option<OwnedEventId>,
4671 /// Read receipt (private ephemeral room event).
4672 pub private_read_receipt: Option<OwnedEventId>,
4673}
4674
4675impl Receipts {
4676 /// Create an empty `Receipts`.
4677 pub fn new() -> Self {
4678 Self::default()
4679 }
4680
4681 /// Set the last event the user has read.
4682 ///
4683 /// It means that the user has read all the events before this event.
4684 ///
4685 /// This is a private marker only visible by the user.
4686 ///
4687 /// Note that this is technically not a receipt as it is persisted in the
4688 /// room account data.
4689 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4690 self.fully_read = event_id.into();
4691 self
4692 }
4693
4694 /// Set the last event presented to the user and forward it to the other
4695 /// users in the room.
4696 ///
4697 /// This is used to reset the unread messages/notification count and
4698 /// advertise to other users the last event that the user has likely seen.
4699 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4700 self.public_read_receipt = event_id.into();
4701 self
4702 }
4703
4704 /// Set the last event presented to the user and don't forward it.
4705 ///
4706 /// This is used to reset the unread messages/notification count.
4707 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4708 self.private_read_receipt = event_id.into();
4709 self
4710 }
4711
4712 /// Whether this `Receipts` is empty.
4713 pub fn is_empty(&self) -> bool {
4714 self.fully_read.is_none()
4715 && self.public_read_receipt.is_none()
4716 && self.private_read_receipt.is_none()
4717 }
4718}
4719
4720/// [Parent space](https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships)
4721/// listed by a room, possibly validated by checking the space's state.
4722#[derive(Debug)]
4723pub enum ParentSpace {
4724 /// The room recognizes the given room as its parent, and the parent
4725 /// recognizes it as its child.
4726 Reciprocal(Room),
4727 /// The room recognizes the given room as its parent, but the parent does
4728 /// not recognizes it as its child. However, the author of the
4729 /// `m.space.parent` event in the room has a sufficient power level in the
4730 /// parent to create the child event.
4731 WithPowerlevel(Room),
4732 /// The room recognizes the given room as its parent, but the parent does
4733 /// not recognizes it as its child.
4734 Illegitimate(Room),
4735 /// The room recognizes the given id as its parent room, but we cannot check
4736 /// whether the parent recognizes it as its child.
4737 Unverifiable(OwnedRoomId),
4738}
4739
4740trait EventSource {
4741 fn get_event(
4742 &self,
4743 event_id: &EventId,
4744 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4745}
4746
4747impl EventSource for &Room {
4748 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4749 self.load_or_fetch_event(event_id, None).await
4750 }
4751}
4752
4753/// Contains the current user's room member info and the optional room member
4754/// info of the sender of the `m.room.member` event that this info represents.
4755#[derive(Debug)]
4756pub struct RoomMemberWithSenderInfo {
4757 /// The actual room member.
4758 pub room_member: RoomMember,
4759 /// The info of the sender of the event `room_member` is based on, if
4760 /// available.
4761 pub sender_info: Option<RoomMember>,
4762}
4763
4764#[cfg(all(test, not(target_family = "wasm")))]
4765mod tests {
4766 use std::collections::BTreeMap;
4767
4768 use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4769 use matrix_sdk_test::{
4770 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
4771 };
4772 use ruma::{
4773 RoomVersionId, event_id,
4774 events::{relation::RelationType, room::member::MembershipState},
4775 owned_event_id, room_id, user_id,
4776 };
4777 use wiremock::{
4778 Mock, MockServer, ResponseTemplate,
4779 matchers::{header, method, path_regex},
4780 };
4781
4782 use crate::{
4783 Client,
4784 config::RequestConfig,
4785 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4786 test_utils::{
4787 client::mock_matrix_session,
4788 logged_in_client,
4789 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4790 },
4791 };
4792
4793 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4794 #[async_test]
4795 async fn test_cache_invalidation_while_encrypt() {
4796 use matrix_sdk_base::store::RoomLoadSettings;
4797 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4798
4799 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4800 let session = mock_matrix_session();
4801
4802 let client = Client::builder()
4803 .homeserver_url("http://localhost:1234")
4804 .request_config(RequestConfig::new().disable_retry())
4805 .sqlite_store(&sqlite_path, None)
4806 .build()
4807 .await
4808 .unwrap();
4809 client
4810 .matrix_auth()
4811 .restore_session(session.clone(), RoomLoadSettings::default())
4812 .await
4813 .unwrap();
4814
4815 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4816
4817 // Mock receiving an event to create an internal room.
4818 let server = MockServer::start().await;
4819 {
4820 Mock::given(method("GET"))
4821 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4822 .and(header("authorization", "Bearer 1234"))
4823 .respond_with(
4824 ResponseTemplate::new(200)
4825 .set_body_json(EventFactory::new().room_encryption().into_content()),
4826 )
4827 .mount(&server)
4828 .await;
4829 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4830 let response = SyncResponseBuilder::default()
4831 .add_joined_room(
4832 JoinedRoomBuilder::default()
4833 .add_state_event(
4834 f.member(user_id!("@example:localhost")).display_name("example"),
4835 )
4836 .add_state_event(f.default_power_levels())
4837 .add_state_event(f.room_encryption()),
4838 )
4839 .build_sync_response();
4840 client.base_client().receive_sync_response(response).await.unwrap();
4841 }
4842
4843 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4844
4845 // Step 1, preshare the room keys.
4846 room.preshare_room_key().await.unwrap();
4847
4848 // Step 2, force lock invalidation by pretending another client obtained the
4849 // lock.
4850 {
4851 let client = Client::builder()
4852 .homeserver_url("http://localhost:1234")
4853 .request_config(RequestConfig::new().disable_retry())
4854 .sqlite_store(&sqlite_path, None)
4855 .build()
4856 .await
4857 .unwrap();
4858 client
4859 .matrix_auth()
4860 .restore_session(session.clone(), RoomLoadSettings::default())
4861 .await
4862 .unwrap();
4863 client
4864 .encryption()
4865 .enable_cross_process_store_lock("client2".to_owned())
4866 .await
4867 .unwrap();
4868
4869 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4870 assert!(guard.is_some());
4871 }
4872
4873 // Step 3, take the crypto-store lock.
4874 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4875 assert!(guard.is_some());
4876
4877 // Step 4, try to encrypt a message.
4878 let olm = client.olm_machine().await;
4879 let olm = olm.as_ref().expect("Olm machine wasn't started");
4880
4881 // Now pretend we're encrypting an event; the olm machine shouldn't rely on
4882 // caching the outgoing session before.
4883 let _encrypted_content = olm
4884 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4885 .await
4886 .unwrap();
4887 }
4888
4889 #[async_test]
4890 async fn test_composer_draft() {
4891 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4892
4893 let client = logged_in_client(None).await;
4894
4895 let response = SyncResponseBuilder::default()
4896 .add_joined_room(JoinedRoomBuilder::default())
4897 .build_sync_response();
4898 client.base_client().receive_sync_response(response).await.unwrap();
4899 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4900
4901 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4902
4903 // Save 2 drafts, one for the room and one for a thread.
4904
4905 let draft = ComposerDraft {
4906 plain_text: "Hello, world!".to_owned(),
4907 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4908 draft_type: ComposerDraftType::NewMessage,
4909 attachments: vec![DraftAttachment {
4910 filename: "cat.txt".to_owned(),
4911 content: matrix_sdk_base::DraftAttachmentContent::File {
4912 data: b"meow".to_vec(),
4913 mimetype: Some("text/plain".to_owned()),
4914 size: Some(5),
4915 },
4916 }],
4917 };
4918
4919 room.save_composer_draft(draft.clone(), None).await.unwrap();
4920
4921 let thread_root = owned_event_id!("$thread_root:b.c");
4922 let thread_draft = ComposerDraft {
4923 plain_text: "Hello, thread!".to_owned(),
4924 html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4925 draft_type: ComposerDraftType::NewMessage,
4926 attachments: vec![DraftAttachment {
4927 filename: "dog.txt".to_owned(),
4928 content: matrix_sdk_base::DraftAttachmentContent::File {
4929 data: b"wuv".to_vec(),
4930 mimetype: Some("text/plain".to_owned()),
4931 size: Some(4),
4932 },
4933 }],
4934 };
4935
4936 room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4937
4938 // Check that the room draft was saved correctly
4939 assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4940
4941 // Check that the thread draft was saved correctly
4942 assert_eq!(
4943 room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4944 Some(thread_draft.clone())
4945 );
4946
4947 // Clear the room draft
4948 room.clear_composer_draft(None).await.unwrap();
4949 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4950
4951 // Check that the thread one is still there
4952 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4953
4954 // Clear the thread draft as well
4955 room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4956 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4957 }
4958
4959 #[async_test]
4960 async fn test_mark_join_requests_as_seen() {
4961 let server = MatrixMockServer::new().await;
4962 let client = server.client_builder().build().await;
4963 let event_id = event_id!("$a:b.c");
4964 let room_id = room_id!("!a:b.c");
4965 let user_id = user_id!("@alice:b.c");
4966
4967 let f = EventFactory::new().room(room_id);
4968 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4969 f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
4970 ]);
4971 let room = server.sync_room(&client, joined_room_builder).await;
4972
4973 // When loading the initial seen ids, there are none
4974 let seen_ids =
4975 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4976 assert!(seen_ids.is_empty());
4977
4978 // We mark a random event id as seen
4979 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4980 .await
4981 .expect("Couldn't mark join request as seen");
4982
4983 // Then we can check it was successfully marked as seen
4984 let seen_ids =
4985 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4986 assert_eq!(seen_ids.len(), 1);
4987 assert_eq!(
4988 seen_ids.into_iter().next().expect("No next value"),
4989 (event_id.to_owned(), user_id.to_owned())
4990 )
4991 }
4992
4993 #[async_test]
4994 async fn test_own_room_membership_with_no_own_member_event() {
4995 let server = MatrixMockServer::new().await;
4996 let client = server.client_builder().build().await;
4997 let room_id = room_id!("!a:b.c");
4998
4999 let room = server.sync_joined_room(&client, room_id).await;
5000
5001 // Since there is no member event for the own user, the method fails.
5002 // This should never happen in an actual room.
5003 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
5004 assert!(error.is_some());
5005 }
5006
5007 #[async_test]
5008 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
5009 let server = MatrixMockServer::new().await;
5010 let client = server.client_builder().build().await;
5011 let room_id = room_id!("!a:b.c");
5012 let user_id = user_id!("@example:localhost");
5013
5014 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
5015 let joined_room_builder =
5016 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5017 let room = server.sync_room(&client, joined_room_builder).await;
5018
5019 // When we load the membership details
5020 let ret = room
5021 .member_with_sender_info(client.user_id().unwrap())
5022 .await
5023 .expect("Room member info should be available");
5024
5025 // We get the member info for the current user
5026 assert_eq!(ret.room_member.event().user_id(), user_id);
5027
5028 // But there is no info for the sender
5029 assert!(ret.sender_info.is_none());
5030 }
5031
5032 #[async_test]
5033 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
5034 let server = MatrixMockServer::new().await;
5035 let client = server.client_builder().build().await;
5036 let room_id = room_id!("!a:b.c");
5037 let user_id = user_id!("@example:localhost");
5038
5039 let f = EventFactory::new().room(room_id).sender(user_id);
5040 let joined_room_builder =
5041 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5042 let room = server.sync_room(&client, joined_room_builder).await;
5043
5044 // When we load the membership details
5045 let ret = room
5046 .member_with_sender_info(client.user_id().unwrap())
5047 .await
5048 .expect("Room member info should be available");
5049
5050 // We get the current user's member info
5051 assert_eq!(ret.room_member.event().user_id(), user_id);
5052
5053 // And the sender has the same info, since it's also the current user
5054 assert!(ret.sender_info.is_some());
5055 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5056 }
5057
5058 #[async_test]
5059 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5060 let server = MatrixMockServer::new().await;
5061 let client = server.client_builder().build().await;
5062 let room_id = room_id!("!a:b.c");
5063 let user_id = user_id!("@example:localhost");
5064 let sender_id = user_id!("@alice:b.c");
5065
5066 let f = EventFactory::new().room(room_id).sender(sender_id);
5067 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5068 f.member(user_id).into(),
5069 // The sender info comes from the sync
5070 f.member(sender_id).into(),
5071 ]);
5072 let room = server.sync_room(&client, joined_room_builder).await;
5073
5074 // When we load the membership details
5075 let ret = room
5076 .member_with_sender_info(client.user_id().unwrap())
5077 .await
5078 .expect("Room member info should be available");
5079
5080 // We get the current user's member info
5081 assert_eq!(ret.room_member.event().user_id(), user_id);
5082
5083 // And also the sender info from the events received in the sync
5084 assert!(ret.sender_info.is_some());
5085 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5086 }
5087
5088 #[async_test]
5089 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5090 let server = MatrixMockServer::new().await;
5091 let client = server.client_builder().build().await;
5092 let room_id = room_id!("!a:b.c");
5093 let user_id = user_id!("@example:localhost");
5094 let sender_id = user_id!("@alice:b.c");
5095
5096 let f = EventFactory::new().room(room_id).sender(sender_id);
5097 let joined_room_builder =
5098 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5099 let room = server.sync_room(&client, joined_room_builder).await;
5100
5101 // We'll receive the member info through the /members endpoint
5102 server
5103 .mock_get_members()
5104 .ok(vec![f.member(sender_id).into_raw()])
5105 .mock_once()
5106 .mount()
5107 .await;
5108
5109 // We get the current user's member info
5110 let ret = room
5111 .member_with_sender_info(client.user_id().unwrap())
5112 .await
5113 .expect("Room member info should be available");
5114
5115 // We get the current user's member info
5116 assert_eq!(ret.room_member.event().user_id(), user_id);
5117
5118 // And also the sender info from the /members endpoint
5119 assert!(ret.sender_info.is_some());
5120 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5121 }
5122
5123 #[async_test]
5124 async fn test_list_threads() {
5125 let server = MatrixMockServer::new().await;
5126 let client = server.client_builder().build().await;
5127
5128 let room_id = room_id!("!a:b.c");
5129 let sender_id = user_id!("@alice:b.c");
5130 let f = EventFactory::new().room(room_id).sender(sender_id);
5131
5132 let eid1 = event_id!("$1");
5133 let eid2 = event_id!("$2");
5134 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5135 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5136
5137 server
5138 .mock_room_threads()
5139 .ok(batch1.clone(), Some("prev_batch".to_owned()))
5140 .mock_once()
5141 .mount()
5142 .await;
5143 server
5144 .mock_room_threads()
5145 .match_from("prev_batch")
5146 .ok(batch2, None)
5147 .mock_once()
5148 .mount()
5149 .await;
5150
5151 let room = server.sync_joined_room(&client, room_id).await;
5152 let result =
5153 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5154 assert_eq!(result.chunk.len(), 1);
5155 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5156 assert!(result.prev_batch_token.is_some());
5157
5158 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5159 let result = room.list_threads(opts).await.expect("Failed to list threads");
5160 assert_eq!(result.chunk.len(), 1);
5161 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5162 assert!(result.prev_batch_token.is_none());
5163 }
5164
5165 #[async_test]
5166 async fn test_relations() {
5167 let server = MatrixMockServer::new().await;
5168 let client = server.client_builder().build().await;
5169
5170 let room_id = room_id!("!a:b.c");
5171 let sender_id = user_id!("@alice:b.c");
5172 let f = EventFactory::new().room(room_id).sender(sender_id);
5173
5174 let target_event_id = owned_event_id!("$target");
5175 let eid1 = event_id!("$1");
5176 let eid2 = event_id!("$2");
5177 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5178 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5179
5180 server
5181 .mock_room_relations()
5182 .match_target_event(target_event_id.clone())
5183 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5184 .mock_once()
5185 .mount()
5186 .await;
5187
5188 server
5189 .mock_room_relations()
5190 .match_target_event(target_event_id.clone())
5191 .match_from("next_batch")
5192 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5193 .mock_once()
5194 .mount()
5195 .await;
5196
5197 let room = server.sync_joined_room(&client, room_id).await;
5198
5199 // Main endpoint: no relation type filtered out.
5200 let mut opts = RelationsOptions {
5201 include_relations: IncludeRelations::AllRelations,
5202 ..Default::default()
5203 };
5204 let result = room
5205 .relations(target_event_id.clone(), opts.clone())
5206 .await
5207 .expect("Failed to list relations the first time");
5208 assert_eq!(result.chunk.len(), 1);
5209 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5210 assert!(result.prev_batch_token.is_none());
5211 assert!(result.next_batch_token.is_some());
5212 assert!(result.recursion_depth.is_none());
5213
5214 opts.from = result.next_batch_token;
5215 let result = room
5216 .relations(target_event_id, opts)
5217 .await
5218 .expect("Failed to list relations the second time");
5219 assert_eq!(result.chunk.len(), 1);
5220 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5221 assert!(result.prev_batch_token.is_none());
5222 assert!(result.next_batch_token.is_none());
5223 assert!(result.recursion_depth.is_none());
5224 }
5225
5226 #[async_test]
5227 async fn test_relations_with_reltype() {
5228 let server = MatrixMockServer::new().await;
5229 let client = server.client_builder().build().await;
5230
5231 let room_id = room_id!("!a:b.c");
5232 let sender_id = user_id!("@alice:b.c");
5233 let f = EventFactory::new().room(room_id).sender(sender_id);
5234
5235 let target_event_id = owned_event_id!("$target");
5236 let eid1 = event_id!("$1");
5237 let eid2 = event_id!("$2");
5238 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5239 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5240
5241 server
5242 .mock_room_relations()
5243 .match_target_event(target_event_id.clone())
5244 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5245 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5246 .mock_once()
5247 .mount()
5248 .await;
5249
5250 server
5251 .mock_room_relations()
5252 .match_target_event(target_event_id.clone())
5253 .match_from("next_batch")
5254 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5255 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5256 .mock_once()
5257 .mount()
5258 .await;
5259
5260 let room = server.sync_joined_room(&client, room_id).await;
5261
5262 // Reltype-filtered endpoint, for threads \o/
5263 let mut opts = RelationsOptions {
5264 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5265 ..Default::default()
5266 };
5267 let result = room
5268 .relations(target_event_id.clone(), opts.clone())
5269 .await
5270 .expect("Failed to list relations the first time");
5271 assert_eq!(result.chunk.len(), 1);
5272 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5273 assert!(result.prev_batch_token.is_none());
5274 assert!(result.next_batch_token.is_some());
5275 assert!(result.recursion_depth.is_none());
5276
5277 opts.from = result.next_batch_token;
5278 let result = room
5279 .relations(target_event_id, opts)
5280 .await
5281 .expect("Failed to list relations the second time");
5282 assert_eq!(result.chunk.len(), 1);
5283 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5284 assert!(result.prev_batch_token.is_none());
5285 assert!(result.next_batch_token.is_none());
5286 assert!(result.recursion_depth.is_none());
5287 }
5288
5289 #[async_test]
5290 async fn test_power_levels_computation() {
5291 let server = MatrixMockServer::new().await;
5292 let client = server.client_builder().build().await;
5293
5294 let room_id = room_id!("!a:b.c");
5295 let sender_id = client.user_id().expect("No session id");
5296 let f = EventFactory::new().room(room_id).sender(sender_id);
5297 let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5298
5299 // Computing the power levels will need these 3 state events:
5300 let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5301 let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5302 let room_member_event = f.member(sender_id).into();
5303
5304 // With only the room member event
5305 let room = server
5306 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5307 .await;
5308 let ctx = room
5309 .push_condition_room_ctx()
5310 .await
5311 .expect("Failed to get push condition context")
5312 .expect("Could not get push condition context");
5313
5314 // The internal power levels couldn't be computed
5315 assert!(ctx.power_levels.is_none());
5316
5317 // Adding the room creation event
5318 let room = server
5319 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5320 .await;
5321 let ctx = room
5322 .push_condition_room_ctx()
5323 .await
5324 .expect("Failed to get push condition context")
5325 .expect("Could not get push condition context");
5326
5327 // The internal power levels still couldn't be computed
5328 assert!(ctx.power_levels.is_none());
5329
5330 // With the room member, room creation and the power levels events
5331 let room = server
5332 .sync_room(
5333 &client,
5334 JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5335 )
5336 .await;
5337 let ctx = room
5338 .push_condition_room_ctx()
5339 .await
5340 .expect("Failed to get push condition context")
5341 .expect("Could not get push condition context");
5342
5343 // The internal power levels can finally be computed
5344 assert!(ctx.power_levels.is_some());
5345 }
5346}