matrix_sdk/room/mod.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level room API
16
17use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30 StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39 IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
44 StateChanges, StateStoreDataKey, StateStoreDataValue,
45 deserialized_responses::{
46 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47 },
48 media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49 serde_helpers::extract_relation,
50 store::{StateStoreExt, ThreadSubscriptionStatus},
51};
52#[cfg(feature = "e2e-encryption")]
53use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
54#[cfg(feature = "e2e-encryption")]
55use matrix_sdk_common::BoxFuture;
56use matrix_sdk_common::{
57 deserialized_responses::TimelineEvent,
58 executor::{JoinHandle, spawn},
59 timeout::timeout,
60};
61#[cfg(feature = "experimental-search")]
62use matrix_sdk_search::error::IndexError;
63#[cfg(feature = "experimental-search")]
64#[cfg(doc)]
65use matrix_sdk_search::index::RoomIndex;
66use mime::Mime;
67use reply::Reply;
68#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
69use ruma::events::AnySyncMessageLikeEvent;
70#[cfg(feature = "experimental-encrypted-state-events")]
71use ruma::events::AnySyncStateEvent;
72#[cfg(feature = "unstable-msc4274")]
73use ruma::events::room::message::GalleryItemType;
74#[cfg(feature = "e2e-encryption")]
75use ruma::events::{
76 AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
77};
78use ruma::{
79 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
80 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
81 api::{
82 client::{
83 config::{set_global_account_data, set_room_account_data},
84 context,
85 filter::LazyLoadOptions,
86 membership::{
87 Invite3pid, ban_user, forget_room, get_member_events,
88 invite_user::{
89 self,
90 v3::{InvitationRecipient, InviteUserId},
91 },
92 kick_user, leave_room, unban_user,
93 },
94 message::send_message_event,
95 read_marker::set_read_marker,
96 receipt::create_receipt,
97 redact::redact_event,
98 room::{get_room_event, report_content, report_room},
99 state::{get_state_event_for_key, send_state_event},
100 tag::{create_tag, delete_tag},
101 threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
102 typing::create_typing_event::{
103 self,
104 v3::{Typing, TypingInfo},
105 },
106 },
107 error::ErrorKind,
108 },
109 assign,
110 events::{
111 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
112 Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
113 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
114 RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
115 StaticStateEventContent, SyncStateEvent,
116 beacon::BeaconEventContent,
117 beacon_info::BeaconInfoEventContent,
118 direct::DirectEventContent,
119 marked_unread::MarkedUnreadEventContent,
120 receipt::{Receipt, ReceiptThread, ReceiptType},
121 relation::RelationType,
122 room::{
123 ImageInfo, MediaSource, ThumbnailInfo,
124 avatar::{self, RoomAvatarEventContent},
125 encryption::PossiblyRedactedRoomEncryptionEventContent,
126 history_visibility::HistoryVisibility,
127 member::{MembershipChange, RoomMemberEventContent, SyncRoomMemberEvent},
128 message::{
129 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
130 ImageMessageEventContent, MessageType, RoomMessageEventContent,
131 TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
132 UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
133 },
134 name::RoomNameEventContent,
135 pinned_events::RoomPinnedEventsEventContent,
136 power_levels::{
137 RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
138 },
139 server_acl::RoomServerAclEventContent,
140 topic::RoomTopicEventContent,
141 },
142 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
143 tag::{TagInfo, TagName},
144 typing::SyncTypingEvent,
145 },
146 int,
147 push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
148 serde::Raw,
149 time::Instant,
150 uint,
151};
152#[cfg(feature = "experimental-encrypted-state-events")]
153use ruma::{
154 events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
155 serde::JsonCastable,
156};
157use serde::de::DeserializeOwned;
158use thiserror::Error;
159use tokio::{join, sync::broadcast};
160use tracing::{debug, error, info, instrument, trace, warn};
161
162use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
163pub use self::{
164 member::{RoomMember, RoomMemberRole},
165 messages::{
166 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
167 Relations, RelationsOptions, ThreadRoots,
168 },
169};
170#[cfg(feature = "e2e-encryption")]
171use crate::encryption::backups::BackupState;
172#[cfg(doc)]
173use crate::event_cache::EventCache;
174#[cfg(feature = "experimental-encrypted-state-events")]
175use crate::room::futures::{SendRawStateEvent, SendStateEvent};
176use crate::{
177 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
178 attachment::{AttachmentConfig, AttachmentInfo},
179 client::WeakClient,
180 config::RequestConfig,
181 error::{BeaconError, WrongRoomState},
182 event_cache::{self, EventCacheDropHandles, RoomEventCache},
183 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
184 live_location_share::ObservableLiveLocation,
185 media::{MediaFormat, MediaRequestParameters},
186 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
187 room::{
188 knock_requests::{KnockRequest, KnockRequestMemberInfo},
189 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
190 privacy_settings::RoomPrivacySettings,
191 },
192 sync::RoomUpdate,
193 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
194};
195
196pub mod edit;
197pub mod futures;
198pub mod identity_status_changes;
199/// Contains code related to requests to join a room.
200pub mod knock_requests;
201mod member;
202mod messages;
203pub mod power_levels;
204pub mod reply;
205
206pub mod calls;
207
208/// Contains all the functionality for modifying the privacy settings in a room.
209pub mod privacy_settings;
210
211#[cfg(feature = "e2e-encryption")]
212pub(crate) mod shared_room_history;
213
214/// A struct containing methods that are common for Joined, Invited and Left
215/// Rooms
216#[derive(Debug, Clone)]
217pub struct Room {
218 inner: BaseRoom,
219 pub(crate) client: Client,
220}
221
222impl Deref for Room {
223 type Target = BaseRoom;
224
225 fn deref(&self) -> &Self::Target {
226 &self.inner
227 }
228}
229
230const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
231const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
232
233/// A thread subscription, according to the semantics of MSC4306.
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub struct ThreadSubscription {
236 /// Whether the subscription was made automatically by a client, not by
237 /// manual user choice.
238 pub automatic: bool,
239}
240
241/// Context allowing to compute the push actions for a given event.
242#[derive(Debug)]
243pub struct PushContext {
244 /// The Ruma context used to compute the push actions.
245 push_condition_room_ctx: PushConditionRoomCtx,
246
247 /// Push rules for this room, based on the push rules state event, or the
248 /// global server default as defined by [`Ruleset::server_default`].
249 push_rules: Ruleset,
250}
251
252impl PushContext {
253 /// Create a new [`PushContext`] from its inner components.
254 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
255 Self { push_condition_room_ctx, push_rules }
256 }
257
258 /// Compute the push rules for a given event.
259 pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
260 self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
261 }
262
263 /// Compute the push rules for a given event, with extra logging to help
264 /// debugging.
265 #[doc(hidden)]
266 #[instrument(skip_all)]
267 pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
268 let rules = self
269 .push_rules
270 .iter()
271 .filter_map(|r| {
272 if !r.enabled() {
273 return None;
274 }
275
276 let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
277
278 let conditions = match r {
279 AnyPushRuleRef::Override(r) => {
280 format!("{:?}", r.conditions)
281 }
282 AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
283 AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
284 AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
285 AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
286 _ => "<unknown push rule kind>".to_owned(),
287 };
288
289 Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
290 })
291 .collect::<Vec<_>>()
292 .join("\n");
293 trace!("rules:\n\n{rules}\n\n");
294
295 let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
296
297 if let Some(found) = found {
298 trace!("rule {} matched", found.rule_id());
299 found.actions().to_owned()
300 } else {
301 trace!("no match");
302 Vec::new()
303 }
304 }
305}
306
307macro_rules! make_media_type {
308 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
309 // If caption is set, use it as body, and filename as the file name; otherwise,
310 // body is the filename, and the filename is not set.
311 // https://github.com/matrix-org/matrix-spec-proposals/blob/main/proposals/2530-body-as-caption.md
312 let (body, formatted, filename) = match $caption {
313 Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
314 None => ($filename, None, None),
315 };
316
317 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
318
319 match $content_type.type_() {
320 mime::IMAGE => {
321 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
322 mimetype: Some($content_type.as_ref().to_owned()),
323 thumbnail_source,
324 thumbnail_info
325 });
326 let content = assign!(ImageMessageEventContent::new(body, $source), {
327 info: Some(Box::new(info)),
328 formatted,
329 filename
330 });
331 <$t>::Image(content)
332 }
333
334 mime::AUDIO => {
335 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
336 formatted,
337 filename
338 });
339
340 if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
341 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
342 let waveform = waveform_vec
343 .iter()
344 .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
345 .map(Into::into)
346 .collect();
347 content.audio =
348 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
349 }
350
351 if matches!($info, Some(AttachmentInfo::Voice(_))) {
352 content.voice = Some(UnstableVoiceContentBlock::new());
353 }
354
355 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
356 audio_info.mimetype = Some($content_type.as_ref().to_owned());
357 let content = content.info(Box::new(audio_info));
358
359 <$t>::Audio(content)
360 }
361
362 mime::VIDEO => {
363 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
364 mimetype: Some($content_type.as_ref().to_owned()),
365 thumbnail_source,
366 thumbnail_info
367 });
368 let content = assign!(VideoMessageEventContent::new(body, $source), {
369 info: Some(Box::new(info)),
370 formatted,
371 filename
372 });
373 <$t>::Video(content)
374 }
375
376 _ => {
377 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
378 mimetype: Some($content_type.as_ref().to_owned()),
379 thumbnail_source,
380 thumbnail_info
381 });
382 let content = assign!(FileMessageEventContent::new(body, $source), {
383 info: Some(Box::new(info)),
384 formatted,
385 filename,
386 });
387 <$t>::File(content)
388 }
389 }
390 }};
391}
392
393impl Room {
394 /// Create a new `Room`
395 ///
396 /// # Arguments
397 /// * `client` - The client used to make requests.
398 ///
399 /// * `room` - The underlying room.
400 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
401 Self { inner: room, client }
402 }
403
404 /// Leave this room.
405 /// If the room was in [`RoomState::Invited`] state, it'll also be forgotten
406 /// automatically.
407 ///
408 /// Only invited and joined rooms can be left.
409 #[doc(alias = "reject_invitation")]
410 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
411 async fn leave_impl(&self) -> (Result<()>, &Room) {
412 let state = self.state();
413 if state == RoomState::Left {
414 return (
415 Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
416 "Joined or Invited",
417 state,
418 )))),
419 self,
420 );
421 }
422
423 // If the room was in Invited state we should also forget it when declining the
424 // invite.
425 let should_forget = matches!(self.state(), RoomState::Invited);
426
427 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
428 let response = self.client.send(request).await;
429
430 // The server can return with an error that is acceptable to ignore. Let's find
431 // which one.
432 if let Err(error) = response {
433 #[allow(clippy::collapsible_match)]
434 let ignore_error = if let Some(error) = error.client_api_error_kind() {
435 match error {
436 // The user is trying to leave a room but doesn't have permissions to do so.
437 // Let's consider the user has left the room.
438 ErrorKind::Forbidden => true,
439 _ => false,
440 }
441 } else {
442 false
443 };
444
445 error!(?error, ignore_error, should_forget, "Failed to leave the room");
446
447 if !ignore_error {
448 return (Err(error.into()), self);
449 }
450 }
451
452 if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
453 return (Err(e.into()), self);
454 }
455
456 if should_forget {
457 trace!("Trying to forget the room");
458
459 if let Err(error) = self.forget().await {
460 error!(?error, "Failed to forget the room");
461 }
462 }
463
464 (Ok(()), self)
465 }
466
467 /// Leave this room and all predecessors.
468 /// If any room was in [`RoomState::Invited`] state, it'll also be forgotten
469 /// automatically.
470 ///
471 /// Only invited and joined rooms can be left.
472 /// Will return an error if the current room fails to leave but
473 /// will only warn if a predecessor fails to leave.
474 pub async fn leave(&self) -> Result<()> {
475 let mut rooms: Vec<Room> = vec![self.clone()];
476 let mut current_room = self;
477
478 while let Some(predecessor) = current_room.predecessor_room() {
479 let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
480
481 if let Some(predecessor_room) = maybe_predecessor_room {
482 rooms.push(predecessor_room.clone());
483 current_room = rooms.last().expect("Room just pushed so can't be empty");
484 } else {
485 warn!("Cannot find predecessor room");
486 break;
487 }
488 }
489
490 let batch_size = 5;
491
492 let rooms_futures: Vec<_> = rooms
493 .iter()
494 .filter_map(|room| match room.state() {
495 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
496 Some(room.leave_impl())
497 }
498 RoomState::Banned | RoomState::Left => None,
499 })
500 .collect();
501
502 let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
503
504 let mut maybe_this_room_failed_with: Option<Error> = None;
505
506 while let Some(result) = futures_stream.next().await {
507 if let (Err(e), room) = result {
508 if room.room_id() == self.room_id() {
509 maybe_this_room_failed_with = Some(e);
510 } else {
511 warn!("Failure while attempting to leave predecessor room: {e:?}");
512 }
513 }
514 }
515
516 maybe_this_room_failed_with.map_or(Ok(()), Err)
517 }
518
519 /// Join this room.
520 ///
521 /// Only invited and left rooms can be joined via this method.
522 #[doc(alias = "accept_invitation")]
523 pub async fn join(&self) -> Result<()> {
524 let prev_room_state = self.inner.state();
525
526 if prev_room_state == RoomState::Joined {
527 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
528 "Invited or Left",
529 prev_room_state,
530 ))));
531 }
532
533 self.client.join_room_by_id(self.room_id()).await?;
534
535 Ok(())
536 }
537
538 /// Get the inner client saved in this room instance.
539 ///
540 /// Returns the client this room is part of.
541 pub fn client(&self) -> Client {
542 self.client.clone()
543 }
544
545 /// Get the sync state of this room, i.e. whether it was fully synced with
546 /// the server.
547 pub fn is_synced(&self) -> bool {
548 self.inner.is_state_fully_synced()
549 }
550
551 /// Gets the avatar of this room, if set.
552 ///
553 /// Returns the avatar.
554 /// If a thumbnail is requested no guarantee on the size of the image is
555 /// given.
556 ///
557 /// # Arguments
558 ///
559 /// * `format` - The desired format of the avatar.
560 ///
561 /// # Examples
562 ///
563 /// ```no_run
564 /// # use matrix_sdk::Client;
565 /// # use matrix_sdk::ruma::room_id;
566 /// # use matrix_sdk::media::MediaFormat;
567 /// # use url::Url;
568 /// # let homeserver = Url::parse("http://example.com").unwrap();
569 /// # async {
570 /// # let user = "example";
571 /// let client = Client::new(homeserver).await.unwrap();
572 /// client.matrix_auth().login_username(user, "password").send().await.unwrap();
573 /// let room_id = room_id!("!roomid:example.com");
574 /// let room = client.get_room(&room_id).unwrap();
575 /// if let Some(avatar) = room.avatar(MediaFormat::File).await.unwrap() {
576 /// std::fs::write("avatar.png", avatar);
577 /// }
578 /// # };
579 /// ```
580 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
581 let Some(url) = self.avatar_url() else { return Ok(None) };
582 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
583 Ok(Some(self.client.media().get_media_content(&request, true).await?))
584 }
585
586 /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
587 /// returns a `Messages` struct that contains a chunk of room and state
588 /// events (`RoomEvent` and `AnyStateEvent`).
589 ///
590 /// With the encryption feature, messages are decrypted if possible. If
591 /// decryption fails for an individual message, that message is returned
592 /// undecrypted.
593 ///
594 /// # Examples
595 ///
596 /// ```no_run
597 /// use matrix_sdk::{Client, room::MessagesOptions};
598 /// # use matrix_sdk::ruma::{
599 /// # api::client::filter::RoomEventFilter,
600 /// # room_id,
601 /// # };
602 /// # use url::Url;
603 ///
604 /// # let homeserver = Url::parse("http://example.com").unwrap();
605 /// # async {
606 /// let options =
607 /// MessagesOptions::backward().from("t47429-4392820_219380_26003_2265");
608 ///
609 /// let mut client = Client::new(homeserver).await.unwrap();
610 /// let room = client.get_room(room_id!("!roomid:example.com")).unwrap();
611 /// assert!(room.messages(options).await.is_ok());
612 /// # };
613 /// ```
614 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
615 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
616 let room_id = self.inner.room_id();
617 let request = options.into_request(room_id);
618 let http_response = self.client.send(request).await?;
619
620 let push_ctx = self.push_context().await?;
621 let chunk = join_all(
622 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
623 )
624 .await;
625
626 // Save the loaded events into the event cache, if it's set up.
627 if let Ok((cache, _handles)) = self.event_cache().await {
628 cache.save_events(chunk.clone()).await;
629 }
630
631 Ok(Messages {
632 start: http_response.start,
633 end: http_response.end,
634 chunk,
635 state: http_response.state,
636 })
637 }
638
639 /// Register a handler for events of a specific type, within this room.
640 ///
641 /// This method works the same way as [`Client::add_event_handler`], except
642 /// that the handler will only be called for events within this room. See
643 /// that method for more details on event handler functions.
644 ///
645 /// `room.add_event_handler(hdl)` is equivalent to
646 /// `client.add_room_event_handler(room_id, hdl)`. Use whichever one is more
647 /// convenient in your use case.
648 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
649 where
650 Ev: SyncEvent + DeserializeOwned + Send + 'static,
651 H: EventHandler<Ev, Ctx>,
652 {
653 self.client.add_room_event_handler(self.room_id(), handler)
654 }
655
656 /// Subscribe to all updates for this room.
657 ///
658 /// The returned receiver will receive a new message for each sync response
659 /// that contains updates for this room.
660 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
661 self.client.subscribe_to_room_updates(self.room_id())
662 }
663
664 /// Subscribe to typing notifications for this room.
665 ///
666 /// The returned receiver will receive a new vector of user IDs for each
667 /// sync response that contains 'm.typing' event. The current user ID will
668 /// be filtered out.
669 pub fn subscribe_to_typing_notifications(
670 &self,
671 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
672 let (sender, receiver) = broadcast::channel(16);
673 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
674 let own_user_id = self.own_user_id().to_owned();
675 move |event: SyncTypingEvent| async move {
676 // Ignore typing notifications from own user.
677 let typing_user_ids = event
678 .content
679 .user_ids
680 .into_iter()
681 .filter(|user_id| *user_id != own_user_id)
682 .collect();
683 // Ignore the result. It can only fail if there are no listeners.
684 let _ = sender.send(typing_user_ids);
685 }
686 });
687 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
688 (drop_guard, receiver)
689 }
690
691 /// Subscribe to updates about users who are in "pin violation" i.e. their
692 /// identity has changed and the user has not yet acknowledged this.
693 ///
694 /// The returned receiver will receive a new vector of
695 /// [`IdentityStatusChange`] each time a /keys/query response shows a
696 /// changed identity for a member of this room, or a sync shows a change
697 /// to the membership of an affected user. (Changes to the current user are
698 /// not directly included, but some changes to the current user's identity
699 /// can trigger changes to how we see other users' identities, which
700 /// will be included.)
701 ///
702 /// The first item in the stream provides the current state of the room:
703 /// each member of the room who is not in "pinned" or "verified" state will
704 /// be included (except the current user).
705 ///
706 /// If the `changed_to` property of an [`IdentityStatusChange`] is set to
707 /// `PinViolation` then a warning should be displayed to the user. If it is
708 /// set to `Pinned` then no warning should be displayed.
709 ///
710 /// Note that if a user who is in pin violation leaves the room, a `Pinned`
711 /// update is sent, to indicate that the warning should be removed, even
712 /// though the user's identity is not necessarily pinned.
713 #[cfg(feature = "e2e-encryption")]
714 pub async fn subscribe_to_identity_status_changes(
715 &self,
716 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
717 IdentityStatusChanges::create_stream(self.clone()).await
718 }
719
720 /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
721 /// decrypted if needs be.
722 ///
723 /// Only logs from the crypto crate will indicate a failure to decrypt.
724 #[cfg(not(feature = "experimental-encrypted-state-events"))]
725 #[allow(clippy::unused_async)] // Used only in e2e-encryption.
726 async fn try_decrypt_event(
727 &self,
728 event: Raw<AnyTimelineEvent>,
729 push_ctx: Option<&PushContext>,
730 ) -> TimelineEvent {
731 #[cfg(feature = "e2e-encryption")]
732 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
733 SyncMessageLikeEvent::Original(_),
734 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
735 && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
736 {
737 return event;
738 }
739
740 let mut event = TimelineEvent::from_plaintext(event.cast());
741 if let Some(push_ctx) = push_ctx {
742 event.set_push_actions(push_ctx.for_event(event.raw()).await);
743 }
744
745 event
746 }
747
748 /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
749 /// decrypted if needs be.
750 ///
751 /// Only logs from the crypto crate will indicate a failure to decrypt.
752 #[cfg(feature = "experimental-encrypted-state-events")]
753 #[allow(clippy::unused_async)] // Used only in e2e-encryption.
754 async fn try_decrypt_event(
755 &self,
756 event: Raw<AnyTimelineEvent>,
757 push_ctx: Option<&PushContext>,
758 ) -> TimelineEvent {
759 // If we have either an encrypted message-like or state event, try to decrypt.
760 match event.deserialize_as::<AnySyncTimelineEvent>() {
761 Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
762 SyncMessageLikeEvent::Original(_),
763 ))) => {
764 if let Ok(event) = self
765 .decrypt_event(
766 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
767 push_ctx,
768 )
769 .await
770 {
771 return event;
772 }
773 }
774 Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
775 SyncStateEvent::Original(_),
776 ))) => {
777 if let Ok(event) = self
778 .decrypt_event(
779 event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
780 push_ctx,
781 )
782 .await
783 {
784 return event;
785 }
786 }
787 _ => {}
788 }
789
790 let mut event = TimelineEvent::from_plaintext(event.cast());
791 if let Some(push_ctx) = push_ctx {
792 event.set_push_actions(push_ctx.for_event(event.raw()).await);
793 }
794
795 event
796 }
797
798 /// Fetch the event with the given `EventId` in this room.
799 ///
800 /// It uses the given [`RequestConfig`] if provided, or the client's default
801 /// one otherwise.
802 pub async fn event(
803 &self,
804 event_id: &EventId,
805 request_config: Option<RequestConfig>,
806 ) -> Result<TimelineEvent> {
807 let request =
808 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
809
810 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
811 let push_ctx = self.push_context().await?;
812 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
813
814 // Save the event into the event cache, if it's set up.
815 if let Ok((cache, _handles)) = self.event_cache().await {
816 cache.save_events([event.clone()]).await;
817 }
818
819 Ok(event)
820 }
821
822 /// Try to load the event from the [`EventCache`][crate::event_cache], if
823 /// it's enabled, or fetch it from the homeserver.
824 ///
825 /// When running the request against the homeserver, it uses the given
826 /// [`RequestConfig`] if provided, or the client's default one
827 /// otherwise.
828 pub async fn load_or_fetch_event(
829 &self,
830 event_id: &EventId,
831 request_config: Option<RequestConfig>,
832 ) -> Result<TimelineEvent> {
833 match self.event_cache().await {
834 Ok((event_cache, _drop_handles)) => {
835 if let Some(event) = event_cache.find_event(event_id).await? {
836 return Ok(event);
837 }
838 // Fallthrough: try with a request.
839 }
840 Err(err) => {
841 debug!("error when getting the event cache: {err}");
842 }
843 }
844 self.event(event_id, request_config).await
845 }
846
847 /// Try to load the event and its relations from the
848 /// [`EventCache`][crate::event_cache], if it's enabled, or fetch it
849 /// from the homeserver.
850 ///
851 /// You can control which types of related events are retrieved using
852 /// `filter`. A `None` value will retrieve any type of related event.
853 ///
854 /// If the event is found in the event cache, but we can't find any
855 /// relations for it there, then we will still attempt to fetch the
856 /// relations from the homeserver.
857 ///
858 /// When running any request against the homeserver, it uses the given
859 /// [`RequestConfig`] if provided, or the client's default one
860 /// otherwise.
861 ///
862 /// Returns a tuple formed of the event and a vector of its relations (that
863 /// can be empty).
864 pub async fn load_or_fetch_event_with_relations(
865 &self,
866 event_id: &EventId,
867 filter: Option<Vec<RelationType>>,
868 request_config: Option<RequestConfig>,
869 ) -> Result<(TimelineEvent, Vec<TimelineEvent>)> {
870 let fetch_relations = async || {
871 // If there's only a single filter, we can use a more efficient request,
872 // specialized on the filter type.
873 //
874 // Otherwise, we need to get all the relations:
875 // - either because no filters implies we fetch all relations,
876 // - or because there are multiple filters and we must filter out manually.
877 let include_relations = if let Some(filter) = &filter
878 && filter.len() == 1
879 {
880 IncludeRelations::RelationsOfType(filter[0].clone())
881 } else {
882 IncludeRelations::AllRelations
883 };
884
885 let mut opts = RelationsOptions {
886 include_relations,
887 recurse: true,
888 limit: Some(uint!(256)),
889 ..Default::default()
890 };
891
892 let mut events = Vec::new();
893 loop {
894 match self.relations(event_id.to_owned(), opts.clone()).await {
895 Ok(relations) => {
896 if let Some(filter) = filter.as_ref() {
897 // Manually filter out the relation types we're interested in.
898 events.extend(relations.chunk.into_iter().filter_map(|ev| {
899 let (rel_type, _) = extract_relation(ev.raw())?;
900 filter
901 .iter()
902 .any(|ruma_filter| ruma_filter == &rel_type)
903 .then_some(ev)
904 }));
905 } else {
906 // No filter: include all events from the response.
907 events.extend(relations.chunk);
908 }
909
910 if let Some(next_from) = relations.next_batch_token {
911 opts.from = Some(next_from);
912 } else {
913 break events;
914 }
915 }
916
917 Err(err) => {
918 warn!(%event_id, "error when loading relations of pinned event from server: {err}");
919 break events;
920 }
921 }
922 }
923 };
924
925 // First, try to load the event *and* its relations from the event cache, all at
926 // once.
927 let event_cache = match self.event_cache().await {
928 Ok((event_cache, drop_handles)) => {
929 if let Some((event, mut relations)) =
930 event_cache.find_event_with_relations(event_id, filter.clone()).await?
931 {
932 if relations.is_empty() {
933 // The event cache doesn't have any relations for this event, try to fetch
934 // them from the server instead.
935 relations = fetch_relations().await;
936 }
937
938 return Ok((event, relations));
939 }
940
941 // Otherwise, get the event from the server.
942 Some((event_cache, drop_handles))
943 }
944
945 Err(err) => {
946 debug!("error when getting the event cache: {err}");
947 // Fallthrough: try with a request.
948 None
949 }
950 };
951
952 // Fetch the event from the server. A failure here is fatal, as we must return
953 // the target event.
954 let event = self.event(event_id, request_config).await?;
955
956 // Try to get the relations from the event cache (if we have one).
957 if let Some((event_cache, _drop_handles)) = event_cache
958 && let Some(relations) =
959 event_cache.find_event_relations(event_id, filter.clone()).await.ok()
960 && !relations.is_empty()
961 {
962 return Ok((event, relations));
963 }
964
965 // We couldn't find the relations in the event cache; fetch them from the
966 // server.
967 Ok((event, fetch_relations().await))
968 }
969
970 /// Fetch the event with the given `EventId` in this room, using the
971 /// `/context` endpoint to get more information.
972 pub async fn event_with_context(
973 &self,
974 event_id: &EventId,
975 lazy_load_members: bool,
976 context_size: UInt,
977 request_config: Option<RequestConfig>,
978 ) -> Result<EventWithContextResponse> {
979 let mut request =
980 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
981
982 request.limit = context_size;
983
984 if lazy_load_members {
985 request.filter.lazy_load_options =
986 LazyLoadOptions::Enabled { include_redundant_members: false };
987 }
988
989 let response = self.client.send(request).with_request_config(request_config).await?;
990
991 let push_ctx = self.push_context().await?;
992 let push_ctx = push_ctx.as_ref();
993 let target_event = if let Some(event) = response.event {
994 Some(self.try_decrypt_event(event, push_ctx).await)
995 } else {
996 None
997 };
998
999 // Note: the joined future will fail if any future failed, but
1000 // [`Self::try_decrypt_event`] doesn't hard-fail when there's a
1001 // decryption error, so we should prevent against most bad cases here.
1002 let (events_before, events_after) = join!(
1003 join_all(
1004 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1005 ),
1006 join_all(
1007 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1008 ),
1009 );
1010
1011 // Save the loaded events into the event cache, if it's set up.
1012 if let Ok((cache, _handles)) = self.event_cache().await {
1013 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
1014 if let Some(event) = &target_event {
1015 events_to_save.push(event.clone());
1016 }
1017
1018 for event in &events_before {
1019 events_to_save.push(event.clone());
1020 }
1021
1022 for event in &events_after {
1023 events_to_save.push(event.clone());
1024 }
1025
1026 cache.save_events(events_to_save).await;
1027 }
1028
1029 Ok(EventWithContextResponse {
1030 event: target_event,
1031 events_before,
1032 events_after,
1033 state: response.state,
1034 prev_batch_token: response.start,
1035 next_batch_token: response.end,
1036 })
1037 }
1038
1039 pub(crate) async fn request_members(&self) -> Result<()> {
1040 self.client
1041 .locks()
1042 .members_request_deduplicated_handler
1043 .run(self.room_id().to_owned(), async move {
1044 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
1045 let response = self
1046 .client
1047 .send(request.clone())
1048 .with_request_config(
1049 // In some cases it can take longer than 30s to load:
1050 // https://github.com/element-hq/synapse/issues/16872
1051 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
1052 )
1053 .await?;
1054
1055 // That's a large `Future`. Let's `Box::pin` to reduce its size on the stack.
1056 Box::pin(self.client.base_client().receive_all_members(
1057 self.room_id(),
1058 &request,
1059 &response,
1060 ))
1061 .await?;
1062
1063 Ok(())
1064 })
1065 .await
1066 }
1067
1068 /// Request to update the encryption state for this room.
1069 ///
1070 /// It does nothing if the encryption state is already
1071 /// [`EncryptionState::Encrypted`] or [`EncryptionState::NotEncrypted`].
1072 pub async fn request_encryption_state(&self) -> Result<()> {
1073 if !self.inner.encryption_state().is_unknown() {
1074 return Ok(());
1075 }
1076
1077 self.client
1078 .locks()
1079 .encryption_state_deduplicated_handler
1080 .run(self.room_id().to_owned(), async move {
1081 // Request the event from the server.
1082 let request = get_state_event_for_key::v3::Request::new(
1083 self.room_id().to_owned(),
1084 StateEventType::RoomEncryption,
1085 "".to_owned(),
1086 );
1087 let response = match self.client.send(request).await {
1088 Ok(response) => Some(
1089 response
1090 .into_content()
1091 .deserialize_as_unchecked::<PossiblyRedactedRoomEncryptionEventContent>(
1092 )?,
1093 ),
1094 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
1095 Err(err) => return Err(err.into()),
1096 };
1097
1098 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
1099
1100 // Persist the event and the fact that we requested it from the server in
1101 // `RoomInfo`.
1102 let mut room_info = self.clone_info();
1103 room_info.mark_encryption_state_synced();
1104 room_info.set_encryption_event(response);
1105 let mut changes = StateChanges::default();
1106 changes.add_room(room_info.clone());
1107
1108 self.client.state_store().save_changes(&changes).await?;
1109 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
1110
1111 Ok(())
1112 })
1113 .await
1114 }
1115
1116 /// Check the encryption state of this room.
1117 ///
1118 /// If the result is [`EncryptionState::Unknown`], one might want to call
1119 /// [`Room::request_encryption_state`].
1120 pub fn encryption_state(&self) -> EncryptionState {
1121 self.inner.encryption_state()
1122 }
1123
1124 /// Force to update the encryption state by calling
1125 /// [`Room::request_encryption_state`], and then calling
1126 /// [`Room::encryption_state`].
1127 ///
1128 /// This method is useful to ensure the encryption state is up-to-date.
1129 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
1130 self.request_encryption_state().await?;
1131
1132 Ok(self.encryption_state())
1133 }
1134
1135 /// Gets additional context info about the client crypto.
1136 #[cfg(feature = "e2e-encryption")]
1137 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
1138 let encryption = self.client.encryption();
1139
1140 let this_device_is_verified = match encryption.get_own_device().await {
1141 Ok(Some(device)) => device.is_verified_with_cross_signing(),
1142
1143 // Should not happen, there will always be an own device
1144 _ => true,
1145 };
1146
1147 let backup_exists_on_server =
1148 encryption.backups().exists_on_server().await.unwrap_or(false);
1149
1150 CryptoContextInfo {
1151 device_creation_ts: encryption.device_creation_timestamp().await,
1152 this_device_is_verified,
1153 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1154 backup_exists_on_server,
1155 }
1156 }
1157
1158 fn are_events_visible(&self) -> bool {
1159 if let RoomState::Invited = self.inner.state() {
1160 return matches!(
1161 self.inner.history_visibility_or_default(),
1162 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1163 );
1164 }
1165
1166 true
1167 }
1168
1169 /// Sync the member list with the server.
1170 ///
1171 /// This method will de-duplicate requests if it is called multiple times in
1172 /// quick succession, in that case the return value will be `None`. This
1173 /// method does nothing if the members are already synced.
1174 pub async fn sync_members(&self) -> Result<()> {
1175 if !self.are_events_visible() {
1176 return Ok(());
1177 }
1178
1179 if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1180 }
1181
1182 /// Get a specific member of this room.
1183 ///
1184 /// *Note*: This method will fetch the members from the homeserver if the
1185 /// member list isn't synchronized due to member lazy loading. Because of
1186 /// that it might panic if it isn't run on a tokio thread.
1187 ///
1188 /// Use [get_member_no_sync()](#method.get_member_no_sync) if you want a
1189 /// method that doesn't do any requests.
1190 ///
1191 /// # Arguments
1192 ///
1193 /// * `user_id` - The ID of the user that should be fetched out of the
1194 /// store.
1195 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1196 self.sync_members().await?;
1197 self.get_member_no_sync(user_id).await
1198 }
1199
1200 /// Get a specific member of this room.
1201 ///
1202 /// *Note*: This method will not fetch the members from the homeserver if
1203 /// the member list isn't synchronized due to member lazy loading. Thus,
1204 /// members could be missing.
1205 ///
1206 /// Use [get_member()](#method.get_member) if you want to ensure to always
1207 /// have the full member list to chose from.
1208 ///
1209 /// # Arguments
1210 ///
1211 /// * `user_id` - The ID of the user that should be fetched out of the
1212 /// store.
1213 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1214 Ok(self
1215 .inner
1216 .get_member(user_id)
1217 .await?
1218 .map(|member| RoomMember::new(self.client.clone(), member)))
1219 }
1220
1221 /// Get members for this room, with the given memberships.
1222 ///
1223 /// *Note*: This method will fetch the members from the homeserver if the
1224 /// member list isn't synchronized due to member lazy loading. Because of
1225 /// that it might panic if it isn't run on a tokio thread.
1226 ///
1227 /// Use [members_no_sync()](#method.members_no_sync) if you want a
1228 /// method that doesn't do any requests.
1229 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1230 self.sync_members().await?;
1231 self.members_no_sync(memberships).await
1232 }
1233
1234 /// Get members for this room, with the given memberships.
1235 ///
1236 /// *Note*: This method will not fetch the members from the homeserver if
1237 /// the member list isn't synchronized due to member lazy loading. Thus,
1238 /// members could be missing.
1239 ///
1240 /// Use [members()](#method.members) if you want to ensure to always get
1241 /// the full member list.
1242 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1243 Ok(self
1244 .inner
1245 .members(memberships)
1246 .await?
1247 .into_iter()
1248 .map(|member| RoomMember::new(self.client.clone(), member))
1249 .collect())
1250 }
1251
1252 /// Sets the display name of the current user within this room.
1253 ///
1254 /// *Note*: This is different to [`crate::Account::set_display_name`] which
1255 /// updates the user's display name across all of their rooms.
1256 pub async fn set_own_member_display_name(
1257 &self,
1258 display_name: Option<String>,
1259 ) -> Result<send_state_event::v3::Response> {
1260 let user_id = self.own_user_id();
1261 let member_event =
1262 self.get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id).await?;
1263
1264 let Some(RawSyncOrStrippedState::Sync(raw_event)) = member_event else {
1265 return Err(Error::InsufficientData);
1266 };
1267
1268 let event = raw_event.deserialize()?;
1269
1270 let mut content = match event {
1271 SyncStateEvent::Original(original_event) => original_event.content,
1272 SyncStateEvent::Redacted(redacted_event) => {
1273 RoomMemberEventContent::new(redacted_event.content.membership)
1274 }
1275 };
1276
1277 content.displayname = display_name;
1278 self.send_state_event_for_key(user_id, content).await
1279 }
1280
1281 /// Get all state events of a given type in this room.
1282 pub async fn get_state_events(
1283 &self,
1284 event_type: StateEventType,
1285 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1286 self.client
1287 .state_store()
1288 .get_state_events(self.room_id(), event_type)
1289 .await
1290 .map_err(Into::into)
1291 }
1292
1293 /// Get all state events of a given statically-known type in this room.
1294 ///
1295 /// # Examples
1296 ///
1297 /// ```no_run
1298 /// # async {
1299 /// # let room: matrix_sdk::Room = todo!();
1300 /// use matrix_sdk::ruma::{
1301 /// events::room::member::RoomMemberEventContent, serde::Raw,
1302 /// };
1303 ///
1304 /// let room_members =
1305 /// room.get_state_events_static::<RoomMemberEventContent>().await?;
1306 /// # anyhow::Ok(())
1307 /// # };
1308 /// ```
1309 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1310 where
1311 C: StaticEventContent<IsPrefix = ruma::events::False>
1312 + StaticStateEventContent
1313 + RedactContent,
1314 C::Redacted: RedactedStateEventContent,
1315 {
1316 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1317 }
1318
1319 /// Get the state events of a given type with the given state keys in this
1320 /// room.
1321 pub async fn get_state_events_for_keys(
1322 &self,
1323 event_type: StateEventType,
1324 state_keys: &[&str],
1325 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1326 self.client
1327 .state_store()
1328 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1329 .await
1330 .map_err(Into::into)
1331 }
1332
1333 /// Get the state events of a given statically-known type with the given
1334 /// state keys in this room.
1335 ///
1336 /// # Examples
1337 ///
1338 /// ```no_run
1339 /// # async {
1340 /// # let room: matrix_sdk::Room = todo!();
1341 /// # let user_ids: &[matrix_sdk::ruma::OwnedUserId] = &[];
1342 /// use matrix_sdk::ruma::events::room::member::RoomMemberEventContent;
1343 ///
1344 /// let room_members = room
1345 /// .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
1346 /// user_ids,
1347 /// )
1348 /// .await?;
1349 /// # anyhow::Ok(())
1350 /// # };
1351 /// ```
1352 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1353 &self,
1354 state_keys: I,
1355 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1356 where
1357 C: StaticEventContent<IsPrefix = ruma::events::False>
1358 + StaticStateEventContent
1359 + RedactContent,
1360 C::StateKey: Borrow<K>,
1361 C::Redacted: RedactedStateEventContent,
1362 K: AsRef<str> + Sized + Sync + 'a,
1363 I: IntoIterator<Item = &'a K> + Send,
1364 I::IntoIter: Send,
1365 {
1366 Ok(self
1367 .client
1368 .state_store()
1369 .get_state_events_for_keys_static(self.room_id(), state_keys)
1370 .await?)
1371 }
1372
1373 /// Get a specific state event in this room.
1374 pub async fn get_state_event(
1375 &self,
1376 event_type: StateEventType,
1377 state_key: &str,
1378 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1379 self.client
1380 .state_store()
1381 .get_state_event(self.room_id(), event_type, state_key)
1382 .await
1383 .map_err(Into::into)
1384 }
1385
1386 /// Get a specific state event of statically-known type with an empty state
1387 /// key in this room.
1388 ///
1389 /// # Examples
1390 ///
1391 /// ```no_run
1392 /// # async {
1393 /// # let room: matrix_sdk::Room = todo!();
1394 /// use matrix_sdk::ruma::events::room::power_levels::RoomPowerLevelsEventContent;
1395 ///
1396 /// let power_levels = room
1397 /// .get_state_event_static::<RoomPowerLevelsEventContent>()
1398 /// .await?
1399 /// .expect("every room has a power_levels event")
1400 /// .deserialize()?;
1401 /// # anyhow::Ok(())
1402 /// # };
1403 /// ```
1404 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1405 where
1406 C: StaticEventContent<IsPrefix = ruma::events::False>
1407 + StaticStateEventContent<StateKey = EmptyStateKey>
1408 + RedactContent,
1409 C::Redacted: RedactedStateEventContent,
1410 {
1411 self.get_state_event_static_for_key(&EmptyStateKey).await
1412 }
1413
1414 /// Get a specific state event of statically-known type in this room.
1415 ///
1416 /// # Examples
1417 ///
1418 /// ```no_run
1419 /// # async {
1420 /// # let room: matrix_sdk::Room = todo!();
1421 /// use matrix_sdk::ruma::{
1422 /// events::room::member::RoomMemberEventContent, serde::Raw, user_id,
1423 /// };
1424 ///
1425 /// let member_event = room
1426 /// .get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id!(
1427 /// "@alice:example.org"
1428 /// ))
1429 /// .await?;
1430 /// # anyhow::Ok(())
1431 /// # };
1432 /// ```
1433 pub async fn get_state_event_static_for_key<C, K>(
1434 &self,
1435 state_key: &K,
1436 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1437 where
1438 C: StaticEventContent<IsPrefix = ruma::events::False>
1439 + StaticStateEventContent
1440 + RedactContent,
1441 C::StateKey: Borrow<K>,
1442 C::Redacted: RedactedStateEventContent,
1443 K: AsRef<str> + ?Sized + Sync,
1444 {
1445 Ok(self
1446 .client
1447 .state_store()
1448 .get_state_event_static_for_key(self.room_id(), state_key)
1449 .await?)
1450 }
1451
1452 /// Returns the parents this room advertises as its parents.
1453 ///
1454 /// Results are in no particular order.
1455 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1456 // Implements this algorithm:
1457 // https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships
1458
1459 // Get all m.space.parent events for this room
1460 Ok(self
1461 .get_state_events_static::<SpaceParentEventContent>()
1462 .await?
1463 .into_iter()
1464 // Extract state key (ie. the parent's id) and sender
1465 .filter_map(|parent_event| match parent_event.deserialize() {
1466 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1467 Some((e.state_key.to_owned(), e.sender))
1468 }
1469 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1470 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1471 Err(e) => {
1472 info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1473 None
1474 }
1475 })
1476 // Check whether the parent recognizes this room as its child
1477 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1478 let Some(parent_room) = self.client.get_room(&state_key) else {
1479 // We are not in the room, cannot check if the relationship is reciprocal
1480 // TODO: try peeking into the room
1481 return Ok(ParentSpace::Unverifiable(state_key));
1482 };
1483 // Get the m.space.child state of the parent with this room's id
1484 // as state key.
1485 if let Some(child_event) = parent_room
1486 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1487 .await?
1488 {
1489 match child_event.deserialize() {
1490 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1491 // There is a valid m.space.child in the parent pointing to
1492 // this room
1493 return Ok(ParentSpace::Reciprocal(parent_room));
1494 }
1495 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1496 Ok(SyncOrStrippedState::Stripped(_)) => {}
1497 Err(e) => {
1498 info!(
1499 room_id = ?self.room_id(), parent_room_id = ?state_key,
1500 "Could not deserialize m.space.child: {e}"
1501 );
1502 }
1503 }
1504 // Otherwise the event is either invalid or redacted. If
1505 // redacted it would be missing the
1506 // `via` key, thereby invalidating that end of the
1507 // relationship: https://spec.matrix.org/v1.8/client-server-api/#mspacechild
1508 }
1509
1510 // No reciprocal m.space.child found, let's check if the sender has the
1511 // power to set it
1512 let Some(member) = parent_room.get_member(&sender).await? else {
1513 // Sender is not even in the parent room
1514 return Ok(ParentSpace::Illegitimate(parent_room));
1515 };
1516
1517 if member.can_send_state(StateEventType::SpaceChild) {
1518 // Sender does have the power to set m.room.child
1519 Ok(ParentSpace::WithPowerlevel(parent_room))
1520 } else {
1521 Ok(ParentSpace::Illegitimate(parent_room))
1522 }
1523 })
1524 .collect::<FuturesUnordered<_>>())
1525 }
1526
1527 /// Read account data in this room, from storage.
1528 pub async fn account_data(
1529 &self,
1530 data_type: RoomAccountDataEventType,
1531 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1532 self.client
1533 .state_store()
1534 .get_room_account_data_event(self.room_id(), data_type)
1535 .await
1536 .map_err(Into::into)
1537 }
1538
1539 /// Get account data of a statically-known type in this room, from storage.
1540 ///
1541 /// # Examples
1542 ///
1543 /// ```no_run
1544 /// # async {
1545 /// # let room: matrix_sdk::Room = todo!();
1546 /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1547 ///
1548 /// match room.account_data_static::<FullyReadEventContent>().await? {
1549 /// Some(fully_read) => {
1550 /// println!("Found read marker: {:?}", fully_read.deserialize()?)
1551 /// }
1552 /// None => println!("No read marker for this room"),
1553 /// }
1554 /// # anyhow::Ok(())
1555 /// # };
1556 /// ```
1557 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1558 where
1559 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1560 {
1561 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1562 }
1563
1564 /// Check if all members of this room are verified and all their devices are
1565 /// verified.
1566 ///
1567 /// Returns true if all devices in the room are verified, otherwise false.
1568 #[cfg(feature = "e2e-encryption")]
1569 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1570 let user_ids = self
1571 .client
1572 .state_store()
1573 .get_user_ids(self.room_id(), RoomMemberships::empty())
1574 .await?;
1575
1576 for user_id in user_ids {
1577 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1578 let any_unverified = devices.devices().any(|d| !d.is_verified());
1579
1580 if any_unverified {
1581 return Ok(false);
1582 }
1583 }
1584
1585 Ok(true)
1586 }
1587
1588 /// Set the given account data event for this room.
1589 ///
1590 /// # Example
1591 /// ```
1592 /// # async {
1593 /// # let room: matrix_sdk::Room = todo!();
1594 /// # let event_id: ruma::OwnedEventId = todo!();
1595 /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1596 /// let content = FullyReadEventContent::new(event_id);
1597 ///
1598 /// room.set_account_data(content).await?;
1599 /// # anyhow::Ok(())
1600 /// # };
1601 /// ```
1602 pub async fn set_account_data<T>(
1603 &self,
1604 content: T,
1605 ) -> Result<set_room_account_data::v3::Response>
1606 where
1607 T: RoomAccountDataEventContent,
1608 {
1609 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1610
1611 let request = set_room_account_data::v3::Request::new(
1612 own_user.to_owned(),
1613 self.room_id().to_owned(),
1614 &content,
1615 )?;
1616
1617 Ok(self.client.send(request).await?)
1618 }
1619
1620 /// Set the given raw account data event in this room.
1621 ///
1622 /// # Example
1623 /// ```
1624 /// # async {
1625 /// # let room: matrix_sdk::Room = todo!();
1626 /// use matrix_sdk::ruma::{
1627 /// events::{
1628 /// AnyRoomAccountDataEventContent, RoomAccountDataEventContent,
1629 /// marked_unread::MarkedUnreadEventContent,
1630 /// },
1631 /// serde::Raw,
1632 /// };
1633 /// let marked_unread_content = MarkedUnreadEventContent::new(true);
1634 /// let full_event: AnyRoomAccountDataEventContent =
1635 /// marked_unread_content.clone().into();
1636 /// room.set_account_data_raw(
1637 /// marked_unread_content.event_type(),
1638 /// Raw::new(&full_event).unwrap(),
1639 /// )
1640 /// .await?;
1641 /// # anyhow::Ok(())
1642 /// # };
1643 /// ```
1644 pub async fn set_account_data_raw(
1645 &self,
1646 event_type: RoomAccountDataEventType,
1647 content: Raw<AnyRoomAccountDataEventContent>,
1648 ) -> Result<set_room_account_data::v3::Response> {
1649 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1650
1651 let request = set_room_account_data::v3::Request::new_raw(
1652 own_user.to_owned(),
1653 self.room_id().to_owned(),
1654 event_type,
1655 content,
1656 );
1657
1658 Ok(self.client.send(request).await?)
1659 }
1660
1661 /// Adds a tag to the room, or updates it if it already exists.
1662 ///
1663 /// Returns the [`create_tag::v3::Response`] from the server.
1664 ///
1665 /// # Arguments
1666 /// * `tag` - The tag to add or update.
1667 ///
1668 /// * `tag_info` - Information about the tag, generally containing the
1669 /// `order` parameter.
1670 ///
1671 /// # Examples
1672 ///
1673 /// ```no_run
1674 /// # use std::str::FromStr;
1675 /// # use ruma::events::tag::{TagInfo, TagName, UserTagName};
1676 /// # async {
1677 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
1678 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
1679 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
1680 /// use matrix_sdk::ruma::events::tag::TagInfo;
1681 ///
1682 /// if let Some(room) = client.get_room(&room_id) {
1683 /// let mut tag_info = TagInfo::new();
1684 /// tag_info.order = Some(0.9);
1685 /// let user_tag = UserTagName::from_str("u.work")?;
1686 ///
1687 /// room.set_tag(TagName::User(user_tag), tag_info).await?;
1688 /// }
1689 /// # anyhow::Ok(()) };
1690 /// ```
1691 pub async fn set_tag(
1692 &self,
1693 tag: TagName,
1694 tag_info: TagInfo,
1695 ) -> Result<create_tag::v3::Response> {
1696 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1697 let request = create_tag::v3::Request::new(
1698 user_id.to_owned(),
1699 self.inner.room_id().to_owned(),
1700 tag.to_string(),
1701 tag_info,
1702 );
1703 Ok(self.client.send(request).await?)
1704 }
1705
1706 /// Removes a tag from the room.
1707 ///
1708 /// Returns the [`delete_tag::v3::Response`] from the server.
1709 ///
1710 /// # Arguments
1711 /// * `tag` - The tag to remove.
1712 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1713 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1714 let request = delete_tag::v3::Request::new(
1715 user_id.to_owned(),
1716 self.inner.room_id().to_owned(),
1717 tag.to_string(),
1718 );
1719 Ok(self.client.send(request).await?)
1720 }
1721
1722 /// Add or remove the `m.favourite` flag for this room.
1723 ///
1724 /// If `is_favourite` is `true`, and the `m.low_priority` tag is set on the
1725 /// room, the tag will be removed too.
1726 ///
1727 /// # Arguments
1728 ///
1729 /// * `is_favourite` - Whether to mark this room as favourite.
1730 /// * `tag_order` - The order of the tag if any.
1731 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1732 if is_favourite {
1733 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1734
1735 self.set_tag(TagName::Favorite, tag_info).await?;
1736
1737 if self.is_low_priority() {
1738 self.remove_tag(TagName::LowPriority).await?;
1739 }
1740 } else {
1741 self.remove_tag(TagName::Favorite).await?;
1742 }
1743 Ok(())
1744 }
1745
1746 /// Add or remove the `m.lowpriority` flag for this room.
1747 ///
1748 /// If `is_low_priority` is `true`, and the `m.favourite` tag is set on the
1749 /// room, the tag will be removed too.
1750 ///
1751 /// # Arguments
1752 ///
1753 /// * `is_low_priority` - Whether to mark this room as low_priority or not.
1754 /// * `tag_order` - The order of the tag if any.
1755 pub async fn set_is_low_priority(
1756 &self,
1757 is_low_priority: bool,
1758 tag_order: Option<f64>,
1759 ) -> Result<()> {
1760 if is_low_priority {
1761 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1762
1763 self.set_tag(TagName::LowPriority, tag_info).await?;
1764
1765 if self.is_favourite() {
1766 self.remove_tag(TagName::Favorite).await?;
1767 }
1768 } else {
1769 self.remove_tag(TagName::LowPriority).await?;
1770 }
1771 Ok(())
1772 }
1773
1774 /// Sets whether this room is a DM.
1775 ///
1776 /// When setting this room as DM, it will be marked as DM for all active
1777 /// members of the room. When unsetting this room as DM, it will be
1778 /// unmarked as DM for all users, not just the members.
1779 ///
1780 /// # Arguments
1781 /// * `is_direct` - Whether to mark this room as direct.
1782 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1783 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1784
1785 let mut content = self
1786 .client
1787 .account()
1788 .account_data::<DirectEventContent>()
1789 .await?
1790 .map(|c| c.deserialize())
1791 .transpose()?
1792 .unwrap_or_default();
1793
1794 let this_room_id = self.inner.room_id();
1795
1796 if is_direct {
1797 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1798 room_members.retain(|member| member.user_id() != self.own_user_id());
1799
1800 for member in room_members {
1801 let entry = content.entry(member.user_id().into()).or_default();
1802 if !entry.iter().any(|room_id| room_id == this_room_id) {
1803 entry.push(this_room_id.to_owned());
1804 }
1805 }
1806 } else {
1807 for (_, list) in content.iter_mut() {
1808 list.retain(|room_id| *room_id != this_room_id);
1809 }
1810
1811 // Remove user ids that don't have any room marked as DM
1812 content.retain(|_, list| !list.is_empty());
1813 }
1814
1815 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1816
1817 self.client.send(request).await?;
1818 Ok(())
1819 }
1820
1821 /// Tries to decrypt a room event.
1822 ///
1823 /// # Arguments
1824 /// * `event` - The room event to be decrypted.
1825 ///
1826 /// Returns the decrypted event. In the case of a decryption error, returns
1827 /// a `TimelineEvent` representing the decryption error.
1828 #[cfg(feature = "e2e-encryption")]
1829 #[cfg(not(feature = "experimental-encrypted-state-events"))]
1830 pub async fn decrypt_event(
1831 &self,
1832 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1833 push_ctx: Option<&PushContext>,
1834 ) -> Result<TimelineEvent> {
1835 let machine = self.client.olm_machine().await;
1836 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1837
1838 match machine
1839 .try_decrypt_room_event(
1840 event.cast_ref(),
1841 self.inner.room_id(),
1842 self.client.decryption_settings(),
1843 )
1844 .await?
1845 {
1846 RoomEventDecryptionResult::Decrypted(decrypted) => {
1847 let push_actions = if let Some(push_ctx) = push_ctx {
1848 Some(push_ctx.for_event(&decrypted.event).await)
1849 } else {
1850 None
1851 };
1852 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1853 }
1854 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1855 self.client
1856 .encryption()
1857 .backups()
1858 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1859 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1860 }
1861 }
1862 }
1863
1864 /// Tries to decrypt a room event.
1865 ///
1866 /// # Arguments
1867 /// * `event` - The room event to be decrypted.
1868 ///
1869 /// Returns the decrypted event. In the case of a decryption error, returns
1870 /// a `TimelineEvent` representing the decryption error.
1871 #[cfg(feature = "experimental-encrypted-state-events")]
1872 pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1873 &self,
1874 event: &Raw<T>,
1875 push_ctx: Option<&PushContext>,
1876 ) -> Result<TimelineEvent> {
1877 let machine = self.client.olm_machine().await;
1878 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1879
1880 match machine
1881 .try_decrypt_room_event(
1882 event.cast_ref(),
1883 self.inner.room_id(),
1884 self.client.decryption_settings(),
1885 )
1886 .await?
1887 {
1888 RoomEventDecryptionResult::Decrypted(decrypted) => {
1889 let push_actions = if let Some(push_ctx) = push_ctx {
1890 Some(push_ctx.for_event(&decrypted.event).await)
1891 } else {
1892 None
1893 };
1894 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1895 }
1896 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1897 self.client
1898 .encryption()
1899 .backups()
1900 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1901 // Cast safety: Anything that can be cast to EncryptedEvent must be a timeline
1902 // event.
1903 Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1904 }
1905 }
1906 }
1907
1908 /// Fetches the [`EncryptionInfo`] for an event decrypted with the supplied
1909 /// session_id.
1910 ///
1911 /// This may be used when we receive an update for a session, and we want to
1912 /// reflect the changes in messages we have received that were encrypted
1913 /// with that session, e.g. to remove a warning shield because a device is
1914 /// now verified.
1915 ///
1916 /// # Arguments
1917 /// * `session_id` - The ID of the Megolm session to get information for.
1918 /// * `sender` - The (claimed) sender of the event where the session was
1919 /// used.
1920 #[cfg(feature = "e2e-encryption")]
1921 pub async fn get_encryption_info(
1922 &self,
1923 session_id: &str,
1924 sender: &UserId,
1925 ) -> Option<Arc<EncryptionInfo>> {
1926 let machine = self.client.olm_machine().await;
1927 let machine = machine.as_ref()?;
1928 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1929 }
1930
1931 /// Forces the currently active room key, which is used to encrypt messages,
1932 /// to be rotated.
1933 ///
1934 /// A new room key will be crated and shared with all the room members the
1935 /// next time a message will be sent. You don't have to call this method,
1936 /// room keys will be rotated automatically when necessary. This method is
1937 /// still useful for debugging purposes.
1938 ///
1939 /// For more info please take a look a the [`encryption`] module
1940 /// documentation.
1941 ///
1942 /// [`encryption`]: crate::encryption
1943 #[cfg(feature = "e2e-encryption")]
1944 pub async fn discard_room_key(&self) -> Result<()> {
1945 let machine = self.client.olm_machine().await;
1946 if let Some(machine) = machine.as_ref() {
1947 machine.discard_room_key(self.inner.room_id()).await?;
1948 Ok(())
1949 } else {
1950 Err(Error::NoOlmMachine)
1951 }
1952 }
1953
1954 /// Ban the user with `UserId` from this room.
1955 ///
1956 /// # Arguments
1957 ///
1958 /// * `user_id` - The user to ban with `UserId`.
1959 ///
1960 /// * `reason` - The reason for banning this user.
1961 #[instrument(skip_all)]
1962 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1963 let request = assign!(
1964 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1965 { reason: reason.map(ToOwned::to_owned) }
1966 );
1967 self.client.send(request).await?;
1968 Ok(())
1969 }
1970
1971 /// Unban the user with `UserId` from this room.
1972 ///
1973 /// # Arguments
1974 ///
1975 /// * `user_id` - The user to unban with `UserId`.
1976 ///
1977 /// * `reason` - The reason for unbanning this user.
1978 #[instrument(skip_all)]
1979 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1980 let request = assign!(
1981 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1982 { reason: reason.map(ToOwned::to_owned) }
1983 );
1984 self.client.send(request).await?;
1985 Ok(())
1986 }
1987
1988 /// Kick a user out of this room.
1989 ///
1990 /// # Arguments
1991 ///
1992 /// * `user_id` - The `UserId` of the user that should be kicked out of the
1993 /// room.
1994 ///
1995 /// * `reason` - Optional reason why the room member is being kicked out.
1996 #[instrument(skip_all)]
1997 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1998 let request = assign!(
1999 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
2000 { reason: reason.map(ToOwned::to_owned) }
2001 );
2002 self.client.send(request).await?;
2003 Ok(())
2004 }
2005
2006 /// Invite the specified user by `UserId` to this room.
2007 ///
2008 /// # Arguments
2009 ///
2010 /// * `user_id` - The `UserId` of the user to invite to the room.
2011 #[instrument(skip_all)]
2012 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
2013 #[cfg(feature = "e2e-encryption")]
2014 if self.client.inner.enable_share_history_on_invite {
2015 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
2016 }
2017
2018 let recipient = InvitationRecipient::UserId(InviteUserId::new(user_id.to_owned()));
2019 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2020 self.client.send(request).await?;
2021
2022 // Force a future room members reload before sending any event to prevent UTDs
2023 // that can happen when some event is sent after a room member has been invited
2024 // but before the /sync request could fetch the membership change event.
2025 self.mark_members_missing();
2026
2027 Ok(())
2028 }
2029
2030 /// Invite the specified user by third party id to this room.
2031 ///
2032 /// # Arguments
2033 ///
2034 /// * `invite_id` - A third party id of a user to invite to the room.
2035 #[instrument(skip_all)]
2036 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
2037 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
2038 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2039 self.client.send(request).await?;
2040
2041 // Force a future room members reload before sending any event to prevent UTDs
2042 // that can happen when some event is sent after a room member has been invited
2043 // but before the /sync request could fetch the membership change event.
2044 self.mark_members_missing();
2045
2046 Ok(())
2047 }
2048
2049 /// Activate typing notice for this room.
2050 ///
2051 /// The typing notice remains active for 4s. It can be deactivate at any
2052 /// point by setting typing to `false`. If this method is called while
2053 /// the typing notice is active nothing will happen. This method can be
2054 /// called on every key stroke, since it will do nothing while typing is
2055 /// active.
2056 ///
2057 /// # Arguments
2058 ///
2059 /// * `typing` - Whether the user is typing or has stopped typing.
2060 ///
2061 /// # Examples
2062 ///
2063 /// ```no_run
2064 /// use std::time::Duration;
2065 ///
2066 /// use matrix_sdk::ruma::api::client::typing::create_typing_event::v3::Typing;
2067 /// # use matrix_sdk::{
2068 /// # Client, config::SyncSettings,
2069 /// # ruma::room_id,
2070 /// # };
2071 /// # use url::Url;
2072 ///
2073 /// # async {
2074 /// # let homeserver = Url::parse("http://localhost:8080")?;
2075 /// # let client = Client::new(homeserver).await?;
2076 /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2077 ///
2078 /// if let Some(room) = client.get_room(&room_id) {
2079 /// room.typing_notice(true).await?
2080 /// }
2081 /// # anyhow::Ok(()) };
2082 /// ```
2083 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
2084 self.ensure_room_joined()?;
2085
2086 // Only send a request to the homeserver if the old timeout has elapsed
2087 // or the typing notice changed state within the `TYPING_NOTICE_TIMEOUT`
2088 let send = if let Some(typing_time) =
2089 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
2090 {
2091 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
2092 // We always reactivate the typing notice if typing is true or
2093 // we may need to deactivate it if it's
2094 // currently active if typing is false
2095 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
2096 } else {
2097 // Only send a request when we need to deactivate typing
2098 !typing
2099 }
2100 } else {
2101 // Typing notice is currently deactivated, therefore, send a request
2102 // only when it's about to be activated
2103 typing
2104 };
2105
2106 if send {
2107 self.send_typing_notice(typing).await?;
2108 }
2109
2110 Ok(())
2111 }
2112
2113 #[instrument(name = "typing_notice", skip(self))]
2114 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
2115 let typing = if typing {
2116 self.client
2117 .inner
2118 .typing_notice_times
2119 .write()
2120 .unwrap()
2121 .insert(self.room_id().to_owned(), Instant::now());
2122 Typing::Yes(TypingInfo::new(TYPING_NOTICE_TIMEOUT))
2123 } else {
2124 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
2125 Typing::No
2126 };
2127
2128 let request = create_typing_event::v3::Request::new(
2129 self.own_user_id().to_owned(),
2130 self.room_id().to_owned(),
2131 typing,
2132 );
2133
2134 self.client.send(request).await?;
2135
2136 Ok(())
2137 }
2138
2139 /// Send a request to set a single receipt.
2140 ///
2141 /// If an unthreaded receipt is sent, this will also unset the unread flag
2142 /// of the room if necessary.
2143 ///
2144 /// # Arguments
2145 ///
2146 /// * `receipt_type` - The type of the receipt to set. Note that it is
2147 /// possible to set the fully-read marker although it is technically not a
2148 /// receipt.
2149 ///
2150 /// * `thread` - The thread where this receipt should apply, if any. Note
2151 /// that this must be [`ReceiptThread::Unthreaded`] when sending a
2152 /// [`ReceiptType::FullyRead`][create_receipt::v3::ReceiptType::FullyRead].
2153 ///
2154 /// * `event_id` - The `EventId` of the event to set the receipt on.
2155 #[instrument(skip_all)]
2156 pub async fn send_single_receipt(
2157 &self,
2158 receipt_type: create_receipt::v3::ReceiptType,
2159 thread: ReceiptThread,
2160 event_id: OwnedEventId,
2161 ) -> Result<()> {
2162 // Since the receipt type and the thread aren't Hash/Ord, flatten then as a
2163 // string key.
2164 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
2165
2166 self.client
2167 .inner
2168 .locks
2169 .read_receipt_deduplicated_handler
2170 .run((request_key, event_id.clone()), async {
2171 // We will unset the unread flag if we send an unthreaded receipt.
2172 let is_unthreaded = thread == ReceiptThread::Unthreaded;
2173
2174 let mut request = create_receipt::v3::Request::new(
2175 self.room_id().to_owned(),
2176 receipt_type,
2177 event_id,
2178 );
2179 request.thread = thread;
2180
2181 self.client.send(request).await?;
2182
2183 if is_unthreaded {
2184 self.set_unread_flag(false).await?;
2185 }
2186
2187 Ok(())
2188 })
2189 .await
2190 }
2191
2192 /// Send a request to set multiple receipts at once.
2193 ///
2194 /// This will also unset the unread flag of the room if necessary.
2195 ///
2196 /// # Arguments
2197 ///
2198 /// * `receipts` - The `Receipts` to send.
2199 ///
2200 /// If `receipts` is empty, this is a no-op.
2201 #[instrument(skip_all)]
2202 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2203 if receipts.is_empty() {
2204 return Ok(());
2205 }
2206
2207 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2208 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2209 fully_read,
2210 read_receipt: public_read_receipt,
2211 private_read_receipt,
2212 });
2213
2214 self.client.send(request).await?;
2215
2216 self.set_unread_flag(false).await?;
2217
2218 Ok(())
2219 }
2220
2221 /// Helper function to enable End-to-end encryption in this room.
2222 /// `encrypted_state_events` is not used unless the
2223 /// `experimental-encrypted-state-events` feature is enabled.
2224 #[allow(unused_variables, unused_mut)]
2225 async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2226 use ruma::{
2227 EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2228 };
2229 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2230
2231 if !self.latest_encryption_state().await?.is_encrypted() {
2232 let mut content =
2233 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2234 #[cfg(feature = "experimental-encrypted-state-events")]
2235 if encrypted_state_events {
2236 content = content.with_encrypted_state();
2237 }
2238 self.send_state_event(content).await?;
2239
2240 // Spin on the sync beat event, since the first sync we receive might not
2241 // include the encryption event.
2242 //
2243 // TODO do we want to return an error here if we time out? This
2244 // could be quite useful if someone wants to enable encryption and
2245 // send a message right after it's enabled.
2246 let res = timeout(
2247 async {
2248 loop {
2249 // Listen for sync events, then check if the encryption state is known.
2250 self.client.inner.sync_beat.listen().await;
2251 let _state_store_lock =
2252 self.client.base_client().state_store_lock().lock().await;
2253
2254 if !self.inner.encryption_state().is_unknown() {
2255 break;
2256 }
2257 }
2258 },
2259 SYNC_WAIT_TIME,
2260 )
2261 .await;
2262
2263 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2264
2265 // If encryption was enabled, return.
2266 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2267 if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2268 debug!("room successfully marked as encrypted");
2269 return Ok(());
2270 }
2271
2272 // If encryption with state event encryption was enabled, return.
2273 #[cfg(feature = "experimental-encrypted-state-events")]
2274 if res.is_ok() && {
2275 if encrypted_state_events {
2276 self.inner.encryption_state().is_state_encrypted()
2277 } else {
2278 self.inner.encryption_state().is_encrypted()
2279 }
2280 } {
2281 debug!("room successfully marked as encrypted");
2282 return Ok(());
2283 }
2284
2285 // If after waiting for multiple syncs, we don't have the encryption state we
2286 // expect, assume the local encryption state is incorrect; this will
2287 // cause the SDK to re-request it later for confirmation, instead of
2288 // assuming it's sync'd and correct (and not encrypted).
2289 debug!("still not marked as encrypted, marking encryption state as missing");
2290
2291 let mut room_info = self.clone_info();
2292 room_info.mark_encryption_state_missing();
2293 let mut changes = StateChanges::default();
2294 changes.add_room(room_info.clone());
2295
2296 self.client.state_store().save_changes(&changes).await?;
2297 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2298 }
2299
2300 Ok(())
2301 }
2302
2303 /// Enable End-to-end encryption in this room.
2304 ///
2305 /// This method will be a noop if encryption is already enabled, otherwise
2306 /// sends a `m.room.encryption` state event to the room. This might fail if
2307 /// you don't have the appropriate power level to enable end-to-end
2308 /// encryption.
2309 ///
2310 /// A sync needs to be received to update the local room state. This method
2311 /// will wait for a sync to be received, this might time out if no
2312 /// sync loop is running or if the server is slow.
2313 ///
2314 /// # Examples
2315 ///
2316 /// ```no_run
2317 /// # use matrix_sdk::{
2318 /// # Client, config::SyncSettings,
2319 /// # ruma::room_id,
2320 /// # };
2321 /// # use url::Url;
2322 /// #
2323 /// # async {
2324 /// # let homeserver = Url::parse("http://localhost:8080")?;
2325 /// # let client = Client::new(homeserver).await?;
2326 /// # let room_id = room_id!("!test:localhost");
2327 /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2328 ///
2329 /// if let Some(room) = client.get_room(&room_id) {
2330 /// room.enable_encryption().await?
2331 /// }
2332 /// # anyhow::Ok(()) };
2333 /// ```
2334 #[instrument(skip_all)]
2335 pub async fn enable_encryption(&self) -> Result<()> {
2336 self.enable_encryption_inner(false).await
2337 }
2338
2339 /// Enable End-to-end encryption in this room, opting into experimental
2340 /// state event encryption.
2341 ///
2342 /// This method will be a noop if encryption is already enabled, otherwise
2343 /// sends a `m.room.encryption` state event to the room. This might fail if
2344 /// you don't have the appropriate power level to enable end-to-end
2345 /// encryption.
2346 ///
2347 /// A sync needs to be received to update the local room state. This method
2348 /// will wait for a sync to be received, this might time out if no
2349 /// sync loop is running or if the server is slow.
2350 ///
2351 /// # Examples
2352 ///
2353 /// ```no_run
2354 /// # use matrix_sdk::{
2355 /// # Client, config::SyncSettings,
2356 /// # ruma::room_id,
2357 /// # };
2358 /// # use url::Url;
2359 /// #
2360 /// # async {
2361 /// # let homeserver = Url::parse("http://localhost:8080")?;
2362 /// # let client = Client::new(homeserver).await?;
2363 /// # let room_id = room_id!("!test:localhost");
2364 /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2365 ///
2366 /// if let Some(room) = client.get_room(&room_id) {
2367 /// room.enable_encryption_with_state_event_encryption().await?
2368 /// }
2369 /// # anyhow::Ok(()) };
2370 /// ```
2371 #[instrument(skip_all)]
2372 #[cfg(feature = "experimental-encrypted-state-events")]
2373 pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2374 self.enable_encryption_inner(true).await
2375 }
2376
2377 /// Share a room key with users in the given room.
2378 ///
2379 /// This will create Olm sessions with all the users/device pairs in the
2380 /// room if necessary and share a room key that can be shared with them.
2381 ///
2382 /// Does nothing if no room key needs to be shared.
2383 // TODO: expose this publicly so people can pre-share a group session if
2384 // e.g. a user starts to type a message for a room.
2385 #[cfg(feature = "e2e-encryption")]
2386 #[instrument(skip_all, fields(room_id = ?self.room_id()))]
2387 async fn preshare_room_key(&self) -> Result<()> {
2388 self.ensure_room_joined()?;
2389
2390 // Take and release the lock on the store, if needs be.
2391 let _guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2392
2393 self.client
2394 .locks()
2395 .group_session_deduplicated_handler
2396 .run(self.room_id().to_owned(), async move {
2397 {
2398 let members = self
2399 .client
2400 .state_store()
2401 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2402 .await?;
2403 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2404 };
2405
2406 let response = self.share_room_key().await;
2407
2408 // If one of the responses failed invalidate the group
2409 // session as using it would end up in undecryptable
2410 // messages.
2411 if let Err(r) = response {
2412 let machine = self.client.olm_machine().await;
2413 if let Some(machine) = machine.as_ref() {
2414 machine.discard_room_key(self.room_id()).await?;
2415 }
2416 return Err(r);
2417 }
2418
2419 Ok(())
2420 })
2421 .await
2422 }
2423
2424 /// Share a group session for a room.
2425 ///
2426 /// # Panics
2427 ///
2428 /// Panics if the client isn't logged in.
2429 #[cfg(feature = "e2e-encryption")]
2430 #[instrument(skip_all)]
2431 async fn share_room_key(&self) -> Result<()> {
2432 self.ensure_room_joined()?;
2433
2434 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2435
2436 for request in requests {
2437 let response = self.client.send_to_device(&request).await?;
2438 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2439 }
2440
2441 Ok(())
2442 }
2443
2444 /// Wait for the room to be fully synced.
2445 ///
2446 /// This method makes sure the room that was returned when joining a room
2447 /// has been echoed back in the sync.
2448 ///
2449 /// Warning: This waits until a sync happens and does not return if no sync
2450 /// is happening. It can also return early when the room is not a joined
2451 /// room anymore.
2452 #[instrument(skip_all)]
2453 pub async fn sync_up(&self) {
2454 while !self.is_synced() && self.state() == RoomState::Joined {
2455 let wait_for_beat = self.client.inner.sync_beat.listen();
2456 // We don't care whether it's a timeout or a sync beat.
2457 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2458 }
2459 }
2460
2461 /// Send a message-like event to this room.
2462 ///
2463 /// Returns the parsed response from the server.
2464 ///
2465 /// If the encryption feature is enabled this method will transparently
2466 /// encrypt the event if this room is encrypted (except for `m.reaction`
2467 /// events, which are never encrypted).
2468 ///
2469 /// **Note**: If you just want to send an event with custom JSON content to
2470 /// a room, you can use the [`send_raw()`][Self::send_raw] method for that.
2471 ///
2472 /// If you want to set a transaction ID for the event, use
2473 /// [`.with_transaction_id()`][SendMessageLikeEvent::with_transaction_id]
2474 /// on the returned value before `.await`ing it.
2475 ///
2476 /// # Arguments
2477 ///
2478 /// * `content` - The content of the message event.
2479 ///
2480 /// # Examples
2481 ///
2482 /// ```no_run
2483 /// # use std::sync::{Arc, RwLock};
2484 /// # use matrix_sdk::{Client, config::SyncSettings};
2485 /// # use url::Url;
2486 /// # use matrix_sdk::ruma::room_id;
2487 /// # use serde::{Deserialize, Serialize};
2488 /// use matrix_sdk::ruma::{
2489 /// MilliSecondsSinceUnixEpoch, TransactionId,
2490 /// events::{
2491 /// macros::EventContent,
2492 /// room::message::{RoomMessageEventContent, TextMessageEventContent},
2493 /// },
2494 /// uint,
2495 /// };
2496 ///
2497 /// # async {
2498 /// # let homeserver = Url::parse("http://localhost:8080")?;
2499 /// # let mut client = Client::new(homeserver).await?;
2500 /// # let room_id = room_id!("!test:localhost");
2501 /// let content = RoomMessageEventContent::text_plain("Hello world");
2502 /// let txn_id = TransactionId::new();
2503 ///
2504 /// if let Some(room) = client.get_room(&room_id) {
2505 /// room.send(content).with_transaction_id(txn_id).await?;
2506 /// }
2507 ///
2508 /// // Custom events work too:
2509 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2510 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
2511 /// struct TokenEventContent {
2512 /// token: String,
2513 /// #[serde(rename = "exp")]
2514 /// expires_at: MilliSecondsSinceUnixEpoch,
2515 /// }
2516 ///
2517 /// # fn generate_token() -> String { todo!() }
2518 /// let content = TokenEventContent {
2519 /// token: generate_token(),
2520 /// expires_at: {
2521 /// let now = MilliSecondsSinceUnixEpoch::now();
2522 /// MilliSecondsSinceUnixEpoch(now.0 + uint!(30_000))
2523 /// },
2524 /// };
2525 ///
2526 /// if let Some(room) = client.get_room(&room_id) {
2527 /// room.send(content).await?;
2528 /// }
2529 /// # anyhow::Ok(()) };
2530 /// ```
2531 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2532 SendMessageLikeEvent::new(self, content)
2533 }
2534
2535 /// Run /keys/query requests for all the non-tracked users, and for users
2536 /// with an out-of-date device list.
2537 #[cfg(feature = "e2e-encryption")]
2538 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2539 let olm = self.client.olm_machine().await;
2540 let olm = olm.as_ref().expect("Olm machine wasn't started");
2541
2542 let members =
2543 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2544
2545 let tracked: HashMap<_, _> = olm
2546 .store()
2547 .load_tracked_users()
2548 .await?
2549 .into_iter()
2550 .map(|tracked| (tracked.user_id, tracked.dirty))
2551 .collect();
2552
2553 // A member has no unknown devices iff it was tracked *and* the tracking is
2554 // not considered dirty.
2555 let members_with_unknown_devices =
2556 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2557
2558 let (req_id, request) =
2559 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2560
2561 if !request.device_keys.is_empty() {
2562 self.client.keys_query(&req_id, request.device_keys).await?;
2563 }
2564
2565 Ok(())
2566 }
2567
2568 /// Send a message-like event with custom JSON content to this room.
2569 ///
2570 /// Returns the parsed response from the server.
2571 ///
2572 /// If the encryption feature is enabled this method will transparently
2573 /// encrypt the event if this room is encrypted (except for `m.reaction`
2574 /// events, which are never encrypted).
2575 ///
2576 /// This method is equivalent to the [`send()`][Self::send] method but
2577 /// allows sending custom JSON payloads, e.g. constructed using the
2578 /// [`serde_json::json!()`] macro.
2579 ///
2580 /// If you want to set a transaction ID for the event, use
2581 /// [`.with_transaction_id()`][SendRawMessageLikeEvent::with_transaction_id]
2582 /// on the returned value before `.await`ing it.
2583 ///
2584 /// # Arguments
2585 ///
2586 /// * `event_type` - The type of the event.
2587 ///
2588 /// * `content` - The content of the event as a raw JSON value. The argument
2589 /// type can be `serde_json::Value`, but also other raw JSON types; for
2590 /// the full list check the documentation of
2591 /// [`IntoRawMessageLikeEventContent`].
2592 ///
2593 /// # Examples
2594 ///
2595 /// ```no_run
2596 /// # use std::sync::{Arc, RwLock};
2597 /// # use matrix_sdk::{Client, config::SyncSettings};
2598 /// # use url::Url;
2599 /// # use matrix_sdk::ruma::room_id;
2600 /// # async {
2601 /// # let homeserver = Url::parse("http://localhost:8080")?;
2602 /// # let mut client = Client::new(homeserver).await?;
2603 /// # let room_id = room_id!("!test:localhost");
2604 /// use serde_json::json;
2605 ///
2606 /// if let Some(room) = client.get_room(&room_id) {
2607 /// room.send_raw("m.room.message", json!({ "body": "Hello world" })).await?;
2608 /// }
2609 /// # anyhow::Ok(()) };
2610 /// ```
2611 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2612 pub fn send_raw<'a>(
2613 &'a self,
2614 event_type: &'a str,
2615 content: impl IntoRawMessageLikeEventContent,
2616 ) -> SendRawMessageLikeEvent<'a> {
2617 // Note: the recorded instrument fields are saved in
2618 // `SendRawMessageLikeEvent::into_future`.
2619 SendRawMessageLikeEvent::new(self, event_type, content)
2620 }
2621
2622 /// Send an attachment to this room.
2623 ///
2624 /// This will upload the given data that the reader produces using the
2625 /// [`upload()`] method and post an event to the given room.
2626 /// If the room is encrypted and the encryption feature is enabled the
2627 /// upload will be encrypted.
2628 ///
2629 /// This is a convenience method that calls the
2630 /// [`upload()`] and afterwards the [`send()`].
2631 ///
2632 /// # Arguments
2633 /// * `filename` - The file name.
2634 ///
2635 /// * `content_type` - The type of the media, this will be used as the
2636 /// content-type header.
2637 ///
2638 /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2639 /// media.
2640 ///
2641 /// * `config` - Metadata and configuration for the attachment.
2642 ///
2643 /// # Examples
2644 ///
2645 /// ```no_run
2646 /// # use std::fs;
2647 /// # use matrix_sdk::{Client, ruma::room_id, attachment::AttachmentConfig};
2648 /// # use url::Url;
2649 /// # use mime;
2650 /// # async {
2651 /// # let homeserver = Url::parse("http://localhost:8080")?;
2652 /// # let mut client = Client::new(homeserver).await?;
2653 /// # let room_id = room_id!("!test:localhost");
2654 /// let mut image = fs::read("/home/example/my-cat.jpg")?;
2655 ///
2656 /// if let Some(room) = client.get_room(&room_id) {
2657 /// room.send_attachment(
2658 /// "my_favorite_cat.jpg",
2659 /// &mime::IMAGE_JPEG,
2660 /// image,
2661 /// AttachmentConfig::new(),
2662 /// ).await?;
2663 /// }
2664 /// # anyhow::Ok(()) };
2665 /// ```
2666 ///
2667 /// [`upload()`]: crate::Media::upload
2668 /// [`send()`]: Self::send
2669 #[instrument(skip_all)]
2670 pub fn send_attachment<'a>(
2671 &'a self,
2672 filename: impl Into<String>,
2673 content_type: &'a Mime,
2674 data: Vec<u8>,
2675 config: AttachmentConfig,
2676 ) -> SendAttachment<'a> {
2677 SendAttachment::new(self, filename.into(), content_type, data, config)
2678 }
2679
2680 /// Prepare and send an attachment to this room.
2681 ///
2682 /// This will upload the given data that the reader produces using the
2683 /// [`upload()`](#method.upload) method and post an event to the given room.
2684 /// If the room is encrypted and the encryption feature is enabled the
2685 /// upload will be encrypted.
2686 ///
2687 /// This is a convenience method that calls the
2688 /// [`Client::upload()`](#Client::method.upload) and afterwards the
2689 /// [`send()`](#method.send).
2690 ///
2691 /// # Arguments
2692 /// * `filename` - The file name.
2693 ///
2694 /// * `content_type` - The type of the media, this will be used as the
2695 /// content-type header.
2696 ///
2697 /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2698 /// media.
2699 ///
2700 /// * `config` - Metadata and configuration for the attachment.
2701 ///
2702 /// * `send_progress` - An observable to transmit forward progress about the
2703 /// upload.
2704 ///
2705 /// * `store_in_cache` - A boolean defining whether the uploaded media will
2706 /// be stored in the cache immediately after a successful upload.
2707 #[instrument(skip_all)]
2708 pub(super) async fn prepare_and_send_attachment<'a>(
2709 &'a self,
2710 filename: String,
2711 content_type: &'a Mime,
2712 data: Vec<u8>,
2713 mut config: AttachmentConfig,
2714 send_progress: SharedObservable<TransmissionProgress>,
2715 store_in_cache: bool,
2716 ) -> Result<send_message_event::v3::Response> {
2717 self.ensure_room_joined()?;
2718
2719 let txn_id = config.txn_id.take();
2720 let mentions = config.mentions.take();
2721
2722 let thumbnail = config.thumbnail.take();
2723
2724 // If necessary, store caching data for the thumbnail ahead of time.
2725 let thumbnail_cache_info = if store_in_cache {
2726 thumbnail
2727 .as_ref()
2728 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2729 } else {
2730 None
2731 };
2732
2733 #[cfg(feature = "e2e-encryption")]
2734 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2735 self.client
2736 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2737 .await?
2738 } else {
2739 self.client
2740 .media()
2741 .upload_plain_media_and_thumbnail(
2742 content_type,
2743 // TODO: get rid of this clone; wait for Ruma to use `Bytes` or something
2744 // similar.
2745 data.clone(),
2746 thumbnail,
2747 send_progress,
2748 )
2749 .await?
2750 };
2751
2752 #[cfg(not(feature = "e2e-encryption"))]
2753 let (media_source, thumbnail) = self
2754 .client
2755 .media()
2756 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2757 .await?;
2758
2759 if store_in_cache {
2760 let media_store_lock_guard = self.client.media_store().lock().await?;
2761
2762 // A failure to cache shouldn't prevent the whole upload from finishing
2763 // properly, so only log errors during caching.
2764
2765 debug!("caching the media");
2766 let request =
2767 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2768
2769 if let Err(err) = media_store_lock_guard
2770 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2771 .await
2772 {
2773 warn!("unable to cache the media after uploading it: {err}");
2774 }
2775
2776 if let Some(((data, height, width), source)) =
2777 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2778 {
2779 debug!("caching the thumbnail");
2780
2781 let request = MediaRequestParameters {
2782 source: source.clone(),
2783 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2784 };
2785
2786 if let Err(err) = media_store_lock_guard
2787 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2788 .await
2789 {
2790 warn!("unable to cache the media after uploading it: {err}");
2791 }
2792 }
2793 }
2794
2795 let content = self
2796 .make_media_event(
2797 Room::make_attachment_type(
2798 content_type,
2799 filename,
2800 media_source,
2801 config.caption,
2802 config.info,
2803 thumbnail,
2804 ),
2805 mentions,
2806 config.reply,
2807 )
2808 .await?;
2809
2810 let mut fut = self.send(content);
2811 if let Some(txn_id) = txn_id {
2812 fut = fut.with_transaction_id(txn_id);
2813 }
2814
2815 fut.await.map(|result| result.response)
2816 }
2817
2818 /// Creates the inner [`MessageType`] for an already-uploaded media file
2819 /// provided by its source.
2820 #[allow(clippy::too_many_arguments)]
2821 pub(crate) fn make_attachment_type(
2822 content_type: &Mime,
2823 filename: String,
2824 source: MediaSource,
2825 caption: Option<TextMessageEventContent>,
2826 info: Option<AttachmentInfo>,
2827 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2828 ) -> MessageType {
2829 make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2830 }
2831
2832 /// Creates the [`RoomMessageEventContent`] based on the message type,
2833 /// mentions and reply information.
2834 pub(crate) async fn make_media_event(
2835 &self,
2836 msg_type: MessageType,
2837 mentions: Option<Mentions>,
2838 reply: Option<Reply>,
2839 ) -> Result<RoomMessageEventContent> {
2840 let mut content = RoomMessageEventContent::new(msg_type);
2841 if let Some(mentions) = mentions {
2842 content = content.add_mentions(mentions);
2843 }
2844 if let Some(reply) = reply {
2845 // Since we just created the event, there is no relation attached to it. Thus,
2846 // it is safe to add the reply relation without overriding anything.
2847 content = self.make_reply_event(content.into(), reply).await?;
2848 }
2849 Ok(content)
2850 }
2851
2852 /// Creates the inner [`GalleryItemType`] for an already-uploaded media file
2853 /// provided by its source.
2854 #[cfg(feature = "unstable-msc4274")]
2855 #[allow(clippy::too_many_arguments)]
2856 pub(crate) fn make_gallery_item_type(
2857 content_type: &Mime,
2858 filename: String,
2859 source: MediaSource,
2860 caption: Option<TextMessageEventContent>,
2861 info: Option<AttachmentInfo>,
2862 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2863 ) -> GalleryItemType {
2864 make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2865 }
2866
2867 /// Update the power levels of a select set of users of this room.
2868 ///
2869 /// Issue a `power_levels` state event request to the server, changing the
2870 /// given UserId -> Int levels. May fail if the `power_levels` aren't
2871 /// locally known yet or the server rejects the state event update, e.g.
2872 /// because of insufficient permissions. Neither permissions to update
2873 /// nor whether the data might be stale is checked prior to issuing the
2874 /// request.
2875 pub async fn update_power_levels(
2876 &self,
2877 updates: Vec<(&UserId, Int)>,
2878 ) -> Result<send_state_event::v3::Response> {
2879 let mut power_levels = self.power_levels().await?;
2880
2881 for (user_id, new_level) in updates {
2882 if new_level == power_levels.users_default {
2883 power_levels.users.remove(user_id);
2884 } else {
2885 power_levels.users.insert(user_id.to_owned(), new_level);
2886 }
2887 }
2888
2889 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2890 }
2891
2892 /// Applies a set of power level changes to this room.
2893 ///
2894 /// Any values that are `None` in the given `RoomPowerLevelChanges` will
2895 /// remain unchanged.
2896 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2897 let mut power_levels = self.power_levels().await?;
2898 power_levels.apply(changes)?;
2899 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2900 Ok(())
2901 }
2902
2903 /// Resets the room's power levels to the default values
2904 ///
2905 /// [spec]: https://spec.matrix.org/v1.9/client-server-api/#mroompower_levels
2906 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2907 let creators = self.creators().unwrap_or_default();
2908 let rules = self.clone_info().room_version_rules_or_default();
2909
2910 let default_power_levels =
2911 RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2912 let changes = RoomPowerLevelChanges::from(default_power_levels);
2913 self.apply_power_level_changes(changes).await?;
2914 Ok(self.power_levels().await?)
2915 }
2916
2917 /// Gets the suggested role for the user with the provided `user_id`.
2918 ///
2919 /// This method checks the `RoomPowerLevels` events instead of loading the
2920 /// member list and looking for the member.
2921 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2922 let power_level = self.get_user_power_level(user_id).await?;
2923 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2924 }
2925
2926 /// Gets the power level the user with the provided `user_id`.
2927 ///
2928 /// This method checks the `RoomPowerLevels` events instead of loading the
2929 /// member list and looking for the member.
2930 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2931 let event = self.power_levels().await?;
2932 Ok(event.for_user(user_id))
2933 }
2934
2935 /// Gets a map with the `UserId` of users with power levels other than `0`
2936 /// and this power level.
2937 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2938 let power_levels = self.power_levels().await.ok();
2939 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2940 if let Some(power_levels) = power_levels {
2941 for (id, level) in power_levels.users.into_iter() {
2942 user_power_levels.insert(id, level.into());
2943 }
2944 }
2945 user_power_levels
2946 }
2947
2948 /// Sets the name of this room.
2949 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2950 self.send_state_event(RoomNameEventContent::new(name)).await
2951 }
2952
2953 /// Sets a new topic for this room.
2954 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2955 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2956 }
2957
2958 /// Sets the new avatar url for this room.
2959 ///
2960 /// # Arguments
2961 /// * `avatar_url` - The owned Matrix uri that represents the avatar
2962 /// * `info` - The optional image info that can be provided for the avatar
2963 pub async fn set_avatar_url(
2964 &self,
2965 url: &MxcUri,
2966 info: Option<avatar::ImageInfo>,
2967 ) -> Result<send_state_event::v3::Response> {
2968 self.ensure_room_joined()?;
2969
2970 let mut room_avatar_event = RoomAvatarEventContent::new();
2971 room_avatar_event.url = Some(url.to_owned());
2972 room_avatar_event.info = info.map(Box::new);
2973
2974 self.send_state_event(room_avatar_event).await
2975 }
2976
2977 /// Removes the avatar from the room
2978 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2979 self.send_state_event(RoomAvatarEventContent::new()).await
2980 }
2981
2982 /// Uploads a new avatar for this room.
2983 ///
2984 /// # Arguments
2985 /// * `mime` - The mime type describing the data
2986 /// * `data` - The data representation of the avatar
2987 /// * `info` - The optional image info provided for the avatar, the blurhash
2988 /// and the mimetype will always be updated
2989 pub async fn upload_avatar(
2990 &self,
2991 mime: &Mime,
2992 data: Vec<u8>,
2993 info: Option<avatar::ImageInfo>,
2994 ) -> Result<send_state_event::v3::Response> {
2995 self.ensure_room_joined()?;
2996
2997 let upload_response = self.client.media().upload(mime, data, None).await?;
2998 let mut info = info.unwrap_or_default();
2999 info.blurhash = upload_response.blurhash;
3000 info.mimetype = Some(mime.to_string());
3001
3002 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
3003 }
3004
3005 /// Send a state event with an empty state key to the homeserver.
3006 ///
3007 /// For state events with a non-empty state key, see
3008 /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3009 ///
3010 /// Returns the parsed response from the server.
3011 ///
3012 /// # Arguments
3013 ///
3014 /// * `content` - The content of the state event.
3015 ///
3016 /// # Examples
3017 ///
3018 /// ```no_run
3019 /// # use serde::{Deserialize, Serialize};
3020 /// # async {
3021 /// # let joined_room: matrix_sdk::Room = todo!();
3022 /// use matrix_sdk::ruma::{
3023 /// EventEncryptionAlgorithm,
3024 /// events::{
3025 /// EmptyStateKey, macros::EventContent,
3026 /// room::encryption::RoomEncryptionEventContent,
3027 /// },
3028 /// };
3029 ///
3030 /// let encryption_event_content = RoomEncryptionEventContent::new(
3031 /// EventEncryptionAlgorithm::MegolmV1AesSha2,
3032 /// );
3033 /// joined_room.send_state_event(encryption_event_content).await?;
3034 ///
3035 /// // Custom event:
3036 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3037 /// #[ruma_event(
3038 /// type = "org.matrix.msc_9000.xxx",
3039 /// kind = State,
3040 /// state_key_type = EmptyStateKey,
3041 /// )]
3042 /// struct XxxStateEventContent {/* fields... */}
3043 ///
3044 /// let content: XxxStateEventContent = todo!();
3045 /// joined_room.send_state_event(content).await?;
3046 /// # anyhow::Ok(()) };
3047 /// ```
3048 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3049 #[instrument(skip_all)]
3050 pub async fn send_state_event(
3051 &self,
3052 content: impl StateEventContent<StateKey = EmptyStateKey>,
3053 ) -> Result<send_state_event::v3::Response> {
3054 self.send_state_event_for_key(&EmptyStateKey, content).await
3055 }
3056
3057 /// Send a state event with an empty state key to the homeserver.
3058 ///
3059 /// For state events with a non-empty state key, see
3060 /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3061 ///
3062 /// If the experimental state event encryption feature is enabled, this
3063 /// method will transparently encrypt the event if this room is
3064 /// encrypted (except if the event type is considered critical for the room
3065 /// to function, as outlined in [MSC4362][msc4362]).
3066 ///
3067 /// Returns the parsed response from the server.
3068 ///
3069 /// # Arguments
3070 ///
3071 /// * `content` - The content of the state event.
3072 ///
3073 /// # Examples
3074 ///
3075 /// ```no_run
3076 /// # use serde::{Deserialize, Serialize};
3077 /// # async {
3078 /// # let joined_room: matrix_sdk::Room = todo!();
3079 /// use matrix_sdk::ruma::{
3080 /// EventEncryptionAlgorithm,
3081 /// events::{
3082 /// EmptyStateKey, macros::EventContent,
3083 /// room::encryption::RoomEncryptionEventContent,
3084 /// },
3085 /// };
3086 ///
3087 /// let encryption_event_content = RoomEncryptionEventContent::new(
3088 /// EventEncryptionAlgorithm::MegolmV1AesSha2,
3089 /// );
3090 /// joined_room.send_state_event(encryption_event_content).await?;
3091 ///
3092 /// // Custom event:
3093 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3094 /// #[ruma_event(
3095 /// type = "org.matrix.msc_9000.xxx",
3096 /// kind = State,
3097 /// state_key_type = EmptyStateKey,
3098 /// )]
3099 /// struct XxxStateEventContent {/* fields... */}
3100 ///
3101 /// let content: XxxStateEventContent = todo!();
3102 /// joined_room.send_state_event(content).await?;
3103 /// # anyhow::Ok(()) };
3104 /// ```
3105 ///
3106 /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/4362-encrypted-state.md
3107 #[cfg(feature = "experimental-encrypted-state-events")]
3108 #[instrument(skip_all)]
3109 pub fn send_state_event<'a>(
3110 &'a self,
3111 content: impl StateEventContent<StateKey = EmptyStateKey>,
3112 ) -> SendStateEvent<'a> {
3113 self.send_state_event_for_key(&EmptyStateKey, content)
3114 }
3115
3116 /// Send a state event to the homeserver.
3117 ///
3118 /// Returns the parsed response from the server.
3119 ///
3120 /// # Arguments
3121 ///
3122 /// * `content` - The content of the state event.
3123 ///
3124 /// * `state_key` - A unique key which defines the overwriting semantics for
3125 /// this piece of room state.
3126 ///
3127 /// # Examples
3128 ///
3129 /// ```no_run
3130 /// # use serde::{Deserialize, Serialize};
3131 /// # async {
3132 /// # let joined_room: matrix_sdk::Room = todo!();
3133 /// use matrix_sdk::ruma::{
3134 /// events::{
3135 /// macros::EventContent,
3136 /// room::member::{RoomMemberEventContent, MembershipState},
3137 /// },
3138 /// mxc_uri,
3139 /// };
3140 ///
3141 /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3142 /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3143 /// content.avatar_url = Some(avatar_url);
3144 ///
3145 /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3146 ///
3147 /// // Custom event:
3148 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3149 /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3150 /// struct XxxStateEventContent { /* fields... */ }
3151 ///
3152 /// let content: XxxStateEventContent = todo!();
3153 /// joined_room.send_state_event_for_key("foo", content).await?;
3154 /// # anyhow::Ok(()) };
3155 /// ```
3156 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3157 pub async fn send_state_event_for_key<C, K>(
3158 &self,
3159 state_key: &K,
3160 content: C,
3161 ) -> Result<send_state_event::v3::Response>
3162 where
3163 C: StateEventContent,
3164 C::StateKey: Borrow<K>,
3165 K: AsRef<str> + ?Sized,
3166 {
3167 self.ensure_room_joined()?;
3168 let request =
3169 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3170 let response = self.client.send(request).await?;
3171 Ok(response)
3172 }
3173
3174 /// Send a state event to the homeserver. If state encryption is enabled in
3175 /// this room, the event will be encrypted.
3176 ///
3177 /// If the experimental state event encryption feature is enabled, this
3178 /// method will transparently encrypt the event if this room is
3179 /// encrypted (except if the event type is considered critical for the room
3180 /// to function, as outlined in [MSC4362][msc4362]).
3181 ///
3182 /// Returns the parsed response from the server.
3183 ///
3184 /// # Arguments
3185 ///
3186 /// * `content` - The content of the state event.
3187 ///
3188 /// * `state_key` - A unique key which defines the overwriting semantics for
3189 /// this piece of room state.
3190 ///
3191 /// # Examples
3192 ///
3193 /// ```no_run
3194 /// # use serde::{Deserialize, Serialize};
3195 /// # async {
3196 /// # let joined_room: matrix_sdk::Room = todo!();
3197 /// use matrix_sdk::ruma::{
3198 /// events::{
3199 /// macros::EventContent,
3200 /// room::member::{RoomMemberEventContent, MembershipState},
3201 /// },
3202 /// mxc_uri,
3203 /// };
3204 ///
3205 /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3206 /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3207 /// content.avatar_url = Some(avatar_url);
3208 ///
3209 /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3210 ///
3211 /// // Custom event:
3212 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3213 /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3214 /// struct XxxStateEventContent { /* fields... */ }
3215 ///
3216 /// let content: XxxStateEventContent = todo!();
3217 /// joined_room.send_state_event_for_key("foo", content).await?;
3218 /// # anyhow::Ok(()) };
3219 /// ```
3220 ///
3221 /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3222 #[cfg(feature = "experimental-encrypted-state-events")]
3223 pub fn send_state_event_for_key<'a, C, K>(
3224 &'a self,
3225 state_key: &K,
3226 content: C,
3227 ) -> SendStateEvent<'a>
3228 where
3229 C: StateEventContent,
3230 C::StateKey: Borrow<K>,
3231 K: AsRef<str> + ?Sized,
3232 {
3233 SendStateEvent::new(self, state_key, content)
3234 }
3235
3236 /// Send a raw room state event to the homeserver.
3237 ///
3238 /// Returns the parsed response from the server.
3239 ///
3240 /// # Arguments
3241 ///
3242 /// * `event_type` - The type of the event that we're sending out.
3243 ///
3244 /// * `state_key` - A unique key which defines the overwriting semantics for
3245 /// this piece of room state. This value is often a zero-length string.
3246 ///
3247 /// * `content` - The content of the event as a raw JSON value. The argument
3248 /// type can be `serde_json::Value`, but also other raw JSON types; for
3249 /// the full list check the documentation of [`IntoRawStateEventContent`].
3250 ///
3251 /// # Examples
3252 ///
3253 /// ```no_run
3254 /// use serde_json::json;
3255 ///
3256 /// # async {
3257 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3258 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3259 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3260 ///
3261 /// if let Some(room) = client.get_room(&room_id) {
3262 /// room.send_state_event_raw("m.room.member", "", json!({
3263 /// "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3264 /// "displayname": "Alice Margatroid",
3265 /// "membership": "join",
3266 /// })).await?;
3267 /// }
3268 /// # anyhow::Ok(()) };
3269 /// ```
3270 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3271 #[instrument(skip_all)]
3272 pub async fn send_state_event_raw(
3273 &self,
3274 event_type: &str,
3275 state_key: &str,
3276 content: impl IntoRawStateEventContent,
3277 ) -> Result<send_state_event::v3::Response> {
3278 self.ensure_room_joined()?;
3279
3280 let request = send_state_event::v3::Request::new_raw(
3281 self.room_id().to_owned(),
3282 event_type.into(),
3283 state_key.to_owned(),
3284 content.into_raw_state_event_content(),
3285 );
3286
3287 Ok(self.client.send(request).await?)
3288 }
3289
3290 /// Send a raw room state event to the homeserver.
3291 ///
3292 /// If the experimental state event encryption feature is enabled, this
3293 /// method will transparently encrypt the event if this room is
3294 /// encrypted (except if the event type is considered critical for the room
3295 /// to function, as outlined in [MSC4362][msc4362]).
3296 ///
3297 /// Returns the parsed response from the server.
3298 ///
3299 /// # Arguments
3300 ///
3301 /// * `event_type` - The type of the event that we're sending out.
3302 ///
3303 /// * `state_key` - A unique key which defines the overwriting semantics for
3304 /// this piece of room state. This value is often a zero-length string.
3305 ///
3306 /// * `content` - The content of the event as a raw JSON value. The argument
3307 /// type can be `serde_json::Value`, but also other raw JSON types; for
3308 /// the full list check the documentation of [`IntoRawStateEventContent`].
3309 ///
3310 /// # Examples
3311 ///
3312 /// ```no_run
3313 /// use serde_json::json;
3314 ///
3315 /// # async {
3316 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3317 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3318 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3319 ///
3320 /// if let Some(room) = client.get_room(&room_id) {
3321 /// room.send_state_event_raw("m.room.member", "", json!({
3322 /// "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3323 /// "displayname": "Alice Margatroid",
3324 /// "membership": "join",
3325 /// })).await?;
3326 /// }
3327 /// # anyhow::Ok(()) };
3328 /// ```
3329 ///
3330 /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3331 #[cfg(feature = "experimental-encrypted-state-events")]
3332 #[instrument(skip_all)]
3333 pub fn send_state_event_raw<'a>(
3334 &'a self,
3335 event_type: &'a str,
3336 state_key: &'a str,
3337 content: impl IntoRawStateEventContent,
3338 ) -> SendRawStateEvent<'a> {
3339 SendRawStateEvent::new(self, event_type, state_key, content)
3340 }
3341
3342 /// Strips all information out of an event of the room.
3343 ///
3344 /// Returns the [`redact_event::v3::Response`] from the server.
3345 ///
3346 /// This cannot be undone. Users may redact their own events, and any user
3347 /// with a power level greater than or equal to the redact power level of
3348 /// the room may redact events there.
3349 ///
3350 /// # Arguments
3351 ///
3352 /// * `event_id` - The ID of the event to redact
3353 ///
3354 /// * `reason` - The reason for the event being redacted.
3355 ///
3356 /// * `txn_id` - A unique ID that can be attached to this event as
3357 /// its transaction ID. If not given one is created for the message.
3358 ///
3359 /// # Examples
3360 ///
3361 /// ```no_run
3362 /// use matrix_sdk::ruma::event_id;
3363 ///
3364 /// # async {
3365 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3366 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3367 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3368 /// #
3369 /// if let Some(room) = client.get_room(&room_id) {
3370 /// let event_id = event_id!("$xxxxxx:example.org");
3371 /// let reason = Some("Indecent material");
3372 /// room.redact(&event_id, reason, None).await?;
3373 /// }
3374 /// # anyhow::Ok(()) };
3375 /// ```
3376 #[instrument(skip_all)]
3377 pub async fn redact(
3378 &self,
3379 event_id: &EventId,
3380 reason: Option<&str>,
3381 txn_id: Option<OwnedTransactionId>,
3382 ) -> HttpResult<redact_event::v3::Response> {
3383 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3384 let request = assign!(
3385 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3386 { reason: reason.map(ToOwned::to_owned) }
3387 );
3388
3389 self.client.send(request).await
3390 }
3391
3392 /// Get a list of servers that should know this room.
3393 ///
3394 /// Uses the synced members of the room and the suggested [routing
3395 /// algorithm] from the Matrix spec.
3396 ///
3397 /// Returns at most three servers.
3398 ///
3399 /// [routing algorithm]: https://spec.matrix.org/v1.3/appendices/#routing
3400 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3401 let acl_ev = self
3402 .get_state_event_static::<RoomServerAclEventContent>()
3403 .await?
3404 .and_then(|ev| ev.deserialize().ok());
3405 let acl = acl_ev.as_ref().and_then(|ev| match ev {
3406 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3407 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3408 });
3409
3410 // Filter out server names that:
3411 // - Are blocked due to server ACLs
3412 // - Are IP addresses
3413 let members: Vec<_> = self
3414 .members_no_sync(RoomMemberships::JOIN)
3415 .await?
3416 .into_iter()
3417 .filter(|member| {
3418 let server = member.user_id().server_name();
3419 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3420 })
3421 .collect();
3422
3423 // Get the server of the highest power level user in the room, provided
3424 // they are at least power level 50.
3425 let max = members
3426 .iter()
3427 .max_by_key(|member| member.power_level())
3428 .filter(|max| max.power_level() >= int!(50))
3429 .map(|member| member.user_id().server_name());
3430
3431 // Sort the servers by population.
3432 let servers = members
3433 .iter()
3434 .map(|member| member.user_id().server_name())
3435 .filter(|server| max.filter(|max| max == server).is_none())
3436 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3437 *servers.entry(server).or_default() += 1;
3438 servers
3439 });
3440 let mut servers: Vec<_> = servers.into_iter().collect();
3441 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3442
3443 Ok(max
3444 .into_iter()
3445 .chain(servers.into_iter().map(|(name, _)| name))
3446 .take(3)
3447 .map(ToOwned::to_owned)
3448 .collect())
3449 }
3450
3451 /// Get a `matrix.to` permalink to this room.
3452 ///
3453 /// If this room has an alias, we use it. Otherwise, we try to use the
3454 /// synced members in the room for [routing] the room ID.
3455 ///
3456 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3457 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3458 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3459 return Ok(alias.matrix_to_uri());
3460 }
3461
3462 let via = self.route().await?;
3463 Ok(self.room_id().matrix_to_uri_via(via))
3464 }
3465
3466 /// Get a `matrix:` permalink to this room.
3467 ///
3468 /// If this room has an alias, we use it. Otherwise, we try to use the
3469 /// synced members in the room for [routing] the room ID.
3470 ///
3471 /// # Arguments
3472 ///
3473 /// * `join` - Whether the user should join the room.
3474 ///
3475 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3476 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3477 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3478 return Ok(alias.matrix_uri(join));
3479 }
3480
3481 let via = self.route().await?;
3482 Ok(self.room_id().matrix_uri_via(via, join))
3483 }
3484
3485 /// Get a `matrix.to` permalink to an event in this room.
3486 ///
3487 /// We try to use the synced members in the room for [routing] the room ID.
3488 ///
3489 /// *Note*: This method does not check if the given event ID is actually
3490 /// part of this room. It needs to be checked before calling this method
3491 /// otherwise the permalink won't work.
3492 ///
3493 /// # Arguments
3494 ///
3495 /// * `event_id` - The ID of the event.
3496 ///
3497 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3498 pub async fn matrix_to_event_permalink(
3499 &self,
3500 event_id: impl Into<OwnedEventId>,
3501 ) -> Result<MatrixToUri> {
3502 // Don't use the alias because an event is tied to a room ID, but an
3503 // alias might point to another room, e.g. after a room upgrade.
3504 let via = self.route().await?;
3505 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3506 }
3507
3508 /// Get a `matrix:` permalink to an event in this room.
3509 ///
3510 /// We try to use the synced members in the room for [routing] the room ID.
3511 ///
3512 /// *Note*: This method does not check if the given event ID is actually
3513 /// part of this room. It needs to be checked before calling this method
3514 /// otherwise the permalink won't work.
3515 ///
3516 /// # Arguments
3517 ///
3518 /// * `event_id` - The ID of the event.
3519 ///
3520 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3521 pub async fn matrix_event_permalink(
3522 &self,
3523 event_id: impl Into<OwnedEventId>,
3524 ) -> Result<MatrixUri> {
3525 // Don't use the alias because an event is tied to a room ID, but an
3526 // alias might point to another room, e.g. after a room upgrade.
3527 let via = self.route().await?;
3528 Ok(self.room_id().matrix_event_uri_via(event_id, via))
3529 }
3530
3531 /// Get the latest receipt of a user in this room.
3532 ///
3533 /// # Arguments
3534 ///
3535 /// * `receipt_type` - The type of receipt to get.
3536 ///
3537 /// * `thread` - The thread containing the event of the receipt, if any.
3538 ///
3539 /// * `user_id` - The ID of the user.
3540 ///
3541 /// Returns the ID of the event on which the receipt applies and the
3542 /// receipt.
3543 pub async fn load_user_receipt(
3544 &self,
3545 receipt_type: ReceiptType,
3546 thread: ReceiptThread,
3547 user_id: &UserId,
3548 ) -> Result<Option<(OwnedEventId, Receipt)>> {
3549 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3550 }
3551
3552 /// Load the receipts for an event in this room from storage.
3553 ///
3554 /// # Arguments
3555 ///
3556 /// * `receipt_type` - The type of receipt to get.
3557 ///
3558 /// * `thread` - The thread containing the event of the receipt, if any.
3559 ///
3560 /// * `event_id` - The ID of the event.
3561 ///
3562 /// Returns a list of IDs of users who have sent a receipt for the event and
3563 /// the corresponding receipts.
3564 pub async fn load_event_receipts(
3565 &self,
3566 receipt_type: ReceiptType,
3567 thread: ReceiptThread,
3568 event_id: &EventId,
3569 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3570 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3571 }
3572
3573 /// Get the push-condition context for this room.
3574 ///
3575 /// Returns `None` if some data couldn't be found. This should only happen
3576 /// in brand new rooms, while we process its state.
3577 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3578 self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions().await?)
3579 .await
3580 }
3581
3582 /// Get the push-condition context for this room, with a choice to include
3583 /// thread subscriptions or not, based on the extra
3584 /// `with_threads_subscriptions` parameter.
3585 ///
3586 /// Returns `None` if some data couldn't be found. This should only happen
3587 /// in brand new rooms, while we process its state.
3588 pub(crate) async fn push_condition_room_ctx_internal(
3589 &self,
3590 with_threads_subscriptions: bool,
3591 ) -> Result<Option<PushConditionRoomCtx>> {
3592 let room_id = self.room_id();
3593 let user_id = self.own_user_id();
3594 let room_info = self.clone_info();
3595 let member_count = room_info.active_members_count();
3596
3597 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3598 member.name().to_owned()
3599 } else {
3600 return Ok(None);
3601 };
3602
3603 let power_levels = match self.power_levels().await {
3604 Ok(power_levels) => Some(power_levels.into()),
3605 Err(error) => {
3606 if matches!(room_info.state(), RoomState::Joined) {
3607 // It's normal to not have the power levels in a non-joined room, so don't log
3608 // the error if the room is not joined
3609 error!("Could not compute power levels for push conditions: {error}");
3610 }
3611 None
3612 }
3613 };
3614
3615 let mut ctx = assign!(PushConditionRoomCtx::new(
3616 room_id.to_owned(),
3617 UInt::new(member_count).unwrap_or(UInt::MAX),
3618 user_id.to_owned(),
3619 user_display_name,
3620 ),
3621 {
3622 power_levels,
3623 });
3624
3625 if with_threads_subscriptions {
3626 let this = self.clone();
3627 ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3628 let room = this.clone();
3629 Box::pin(async move {
3630 if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3631 maybe_sub.is_some()
3632 } else {
3633 false
3634 }
3635 })
3636 });
3637 }
3638
3639 Ok(Some(ctx))
3640 }
3641
3642 /// Retrieves a [`PushContext`] that can be used to compute the push
3643 /// actions for events.
3644 pub async fn push_context(&self) -> Result<Option<PushContext>> {
3645 self.push_context_internal(self.client.enabled_thread_subscriptions().await?).await
3646 }
3647
3648 /// Retrieves a [`PushContext`] that can be used to compute the push actions
3649 /// for events, with a choice to include thread subscriptions or not,
3650 /// based on the extra `with_threads_subscriptions` parameter.
3651 #[instrument(skip(self))]
3652 pub(crate) async fn push_context_internal(
3653 &self,
3654 with_threads_subscriptions: bool,
3655 ) -> Result<Option<PushContext>> {
3656 let Some(push_condition_room_ctx) =
3657 self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3658 else {
3659 debug!("Could not aggregate push context");
3660 return Ok(None);
3661 };
3662 let push_rules = self.client().account().push_rules().await?;
3663 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3664 }
3665
3666 /// Get the push actions for the given event with the current room state.
3667 ///
3668 /// Note that it is possible that no push action is returned because the
3669 /// current room state does not have all the required state events.
3670 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3671 if let Some(ctx) = self.push_context().await? {
3672 Ok(Some(ctx.for_event(event).await))
3673 } else {
3674 Ok(None)
3675 }
3676 }
3677
3678 /// The membership details of the (latest) invite for the logged-in user in
3679 /// this room.
3680 pub async fn invite_details(&self) -> Result<Invite> {
3681 let state = self.state();
3682
3683 if state != RoomState::Invited {
3684 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3685 }
3686
3687 let invitee = self
3688 .get_member_no_sync(self.own_user_id())
3689 .await?
3690 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3691 let event = invitee.event();
3692
3693 let inviter_id = event.sender().to_owned();
3694 let inviter = self.get_member_no_sync(&inviter_id).await?;
3695
3696 Ok(Invite { invitee, inviter_id, inviter })
3697 }
3698
3699 /// Get the membership details for the current user.
3700 ///
3701 /// Returns:
3702 /// - If the user was present in the room, a
3703 /// [`RoomMemberWithSenderInfo`] containing both the user info and the
3704 /// member info of the sender of the `m.room.member` event.
3705 /// - If the current user is not present, an error.
3706 pub async fn member_with_sender_info(
3707 &self,
3708 user_id: &UserId,
3709 ) -> Result<RoomMemberWithSenderInfo> {
3710 let Some(member) = self.get_member_no_sync(user_id).await? else {
3711 return Err(Error::InsufficientData);
3712 };
3713
3714 let sender_member =
3715 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3716 // If the sender room member info is already available, return it
3717 Some(member)
3718 } else if self.are_members_synced() {
3719 // The room members are synced and we couldn't find the sender info
3720 None
3721 } else if self.sync_members().await.is_ok() {
3722 // Try getting the sender room member info again after syncing
3723 self.get_member_no_sync(member.event().sender()).await?
3724 } else {
3725 None
3726 };
3727
3728 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3729 }
3730
3731 /// Forget this room.
3732 ///
3733 /// This communicates to the homeserver that it should forget the room.
3734 ///
3735 /// Only left or banned-from rooms can be forgotten.
3736 pub async fn forget(&self) -> Result<()> {
3737 let state = self.state();
3738 match state {
3739 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3740 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3741 "Left / Banned",
3742 state,
3743 ))));
3744 }
3745 RoomState::Left | RoomState::Banned => {}
3746 }
3747
3748 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3749 let _response = self.client.send(request).await?;
3750
3751 // If it was a DM, remove the room from the `m.direct` global account data.
3752 if self.inner.direct_targets_length() != 0
3753 && let Err(e) = self.set_is_direct(false).await
3754 {
3755 // It is not important whether we managed to remove the room, it will not have
3756 // any consequences, so just log the error.
3757 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3758 }
3759
3760 self.client.base_client().forget_room(self.inner.room_id()).await?;
3761
3762 Ok(())
3763 }
3764
3765 fn ensure_room_joined(&self) -> Result<()> {
3766 let state = self.state();
3767 if state == RoomState::Joined {
3768 Ok(())
3769 } else {
3770 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3771 }
3772 }
3773
3774 /// Get the notification mode.
3775 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3776 if !matches!(self.state(), RoomState::Joined) {
3777 return None;
3778 }
3779
3780 let notification_settings = self.client().notification_settings().await;
3781
3782 // Get the user-defined mode if available
3783 let notification_mode =
3784 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3785
3786 if notification_mode.is_some() {
3787 notification_mode
3788 } else if let Ok(is_encrypted) =
3789 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3790 {
3791 // Otherwise, if encrypted status is available, get the default mode for this
3792 // type of room.
3793 // From the point of view of notification settings, a `one-to-one` room is one
3794 // that involves exactly two people.
3795 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3796 let default_mode = notification_settings
3797 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3798 .await;
3799 Some(default_mode)
3800 } else {
3801 None
3802 }
3803 }
3804
3805 /// Get the user-defined notification mode.
3806 ///
3807 /// The result is cached for fast and non-async call. To read the cached
3808 /// result, use
3809 /// [`matrix_sdk_base::Room::cached_user_defined_notification_mode`].
3810 //
3811 // Note for maintainers:
3812 //
3813 // The fact the result is cached is an important property. If you change that in
3814 // the future, please review all calls to this method.
3815 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3816 if !matches!(self.state(), RoomState::Joined) {
3817 return None;
3818 }
3819
3820 let notification_settings = self.client().notification_settings().await;
3821
3822 // Get the user-defined mode if available.
3823 let mode =
3824 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3825
3826 if let Some(mode) = mode {
3827 self.update_cached_user_defined_notification_mode(mode);
3828 }
3829
3830 mode
3831 }
3832
3833 /// Report an event as inappropriate to the homeserver's administrator.
3834 ///
3835 /// # Arguments
3836 ///
3837 /// * `event_id` - The ID of the event to report.
3838 /// * `score` - The score to rate this content.
3839 /// * `reason` - The reason the content is being reported.
3840 ///
3841 /// # Errors
3842 ///
3843 /// Returns an error if the room is not joined or if an error occurs with
3844 /// the request.
3845 pub async fn report_content(
3846 &self,
3847 event_id: OwnedEventId,
3848 reason: Option<String>,
3849 ) -> Result<report_content::v3::Response> {
3850 let state = self.state();
3851 if state != RoomState::Joined {
3852 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3853 }
3854
3855 let request = assign!(
3856 report_content::v3::Request::new(
3857 self.inner.room_id().to_owned(),
3858 event_id,
3859 ), {
3860 reason: reason
3861 }
3862 );
3863 Ok(self.client.send(request).await?)
3864 }
3865
3866 /// Reports a room as inappropriate to the server.
3867 /// The caller is not required to be joined to the room to report it.
3868 ///
3869 /// # Arguments
3870 ///
3871 /// * `reason` - The reason the room is being reported.
3872 ///
3873 /// # Errors
3874 ///
3875 /// Returns an error if the room is not found or on rate limit
3876 pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3877 let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3878
3879 Ok(self.client.send(request).await?)
3880 }
3881
3882 /// Set a flag on the room to indicate that the user has explicitly marked
3883 /// it as (un)read.
3884 ///
3885 /// This is a no-op if [`BaseRoom::is_marked_unread()`] returns the same
3886 /// value as `unread`.
3887 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3888 if self.is_marked_unread() == unread {
3889 // The request is not necessary.
3890 return Ok(());
3891 }
3892
3893 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3894
3895 let content = MarkedUnreadEventContent::new(unread);
3896
3897 let request = set_room_account_data::v3::Request::new(
3898 user_id.to_owned(),
3899 self.inner.room_id().to_owned(),
3900 &content,
3901 )?;
3902
3903 self.client.send(request).await?;
3904 Ok(())
3905 }
3906
3907 /// Returns the [`RoomEventCache`] associated to this room, assuming the
3908 /// global [`EventCache`] has been enabled for subscription.
3909 pub async fn event_cache(
3910 &self,
3911 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3912 self.client.event_cache().for_room(self.room_id()).await
3913 }
3914
3915 /// Get the beacon information event in the room for the `user_id`.
3916 ///
3917 /// # Errors
3918 ///
3919 /// Returns an error if the event is redacted, stripped, not found or could
3920 /// not be deserialized.
3921 pub(crate) async fn get_user_beacon_info(
3922 &self,
3923 user_id: &UserId,
3924 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3925 let raw_event = self
3926 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3927 .await?
3928 .ok_or(BeaconError::NotFound)?;
3929
3930 match raw_event.deserialize()? {
3931 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3932 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3933 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3934 }
3935 }
3936
3937 /// Start sharing live location in the room.
3938 ///
3939 /// # Arguments
3940 ///
3941 /// * `duration_millis` - The duration for which the live location is
3942 /// shared, in milliseconds.
3943 /// * `description` - An optional description for the live location share.
3944 ///
3945 /// # Errors
3946 ///
3947 /// Returns an error if the room is not joined or if the state event could
3948 /// not be sent.
3949 pub async fn start_live_location_share(
3950 &self,
3951 duration_millis: u64,
3952 description: Option<String>,
3953 ) -> Result<send_state_event::v3::Response> {
3954 self.ensure_room_joined()?;
3955
3956 self.send_state_event_for_key(
3957 self.own_user_id(),
3958 BeaconInfoEventContent::new(
3959 description,
3960 Duration::from_millis(duration_millis),
3961 true,
3962 None,
3963 ),
3964 )
3965 .await
3966 }
3967
3968 /// Stop sharing live location in the room.
3969 ///
3970 /// # Errors
3971 ///
3972 /// Returns an error if the room is not joined, if the beacon information
3973 /// is redacted or stripped, or if the state event is not found.
3974 pub async fn stop_live_location_share(
3975 &self,
3976 ) -> Result<send_state_event::v3::Response, BeaconError> {
3977 self.ensure_room_joined()?;
3978
3979 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3980 beacon_info_event.content.stop();
3981 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3982 }
3983
3984 /// Send a location beacon event in the current room.
3985 ///
3986 /// # Arguments
3987 ///
3988 /// * `geo_uri` - The geo URI of the location beacon.
3989 ///
3990 /// # Errors
3991 ///
3992 /// Returns an error if the room is not joined, if the beacon information
3993 /// is redacted or stripped, if the location share is no longer live,
3994 /// or if the state event is not found.
3995 pub async fn send_location_beacon(
3996 &self,
3997 geo_uri: String,
3998 ) -> Result<send_message_event::v3::Response, BeaconError> {
3999 self.ensure_room_joined()?;
4000
4001 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
4002
4003 if beacon_info_event.content.is_live() {
4004 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
4005 Ok(self.send(content).await?.response)
4006 } else {
4007 Err(BeaconError::NotLive)
4008 }
4009 }
4010
4011 /// Store the given `ComposerDraft` in the state store using the current
4012 /// room id and optional thread root id as identifier.
4013 pub async fn save_composer_draft(
4014 &self,
4015 draft: ComposerDraft,
4016 thread_root: Option<&EventId>,
4017 ) -> Result<()> {
4018 self.client
4019 .state_store()
4020 .set_kv_data(
4021 StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
4022 StateStoreDataValue::ComposerDraft(draft),
4023 )
4024 .await?;
4025 Ok(())
4026 }
4027
4028 /// Retrieve the `ComposerDraft` stored in the state store for this room
4029 /// and given thread, if any.
4030 pub async fn load_composer_draft(
4031 &self,
4032 thread_root: Option<&EventId>,
4033 ) -> Result<Option<ComposerDraft>> {
4034 let data = self
4035 .client
4036 .state_store()
4037 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4038 .await?;
4039 Ok(data.and_then(|d| d.into_composer_draft()))
4040 }
4041
4042 /// Remove the `ComposerDraft` stored in the state store for this room
4043 /// and given thread, if any.
4044 pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
4045 self.client
4046 .state_store()
4047 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4048 .await?;
4049 Ok(())
4050 }
4051
4052 /// Load pinned state events for a room from the `/state` endpoint in the
4053 /// home server.
4054 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
4055 let response = self
4056 .client
4057 .send(get_state_event_for_key::v3::Request::new(
4058 self.room_id().to_owned(),
4059 StateEventType::RoomPinnedEvents,
4060 "".to_owned(),
4061 ))
4062 .await;
4063
4064 match response {
4065 Ok(response) => Ok(Some(
4066 response
4067 .into_content()
4068 .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
4069 .pinned,
4070 )),
4071 Err(http_error) => match http_error.as_client_api_error() {
4072 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
4073 _ => Err(http_error.into()),
4074 },
4075 }
4076 }
4077
4078 /// Observe live location sharing events for this room.
4079 ///
4080 /// The returned observable will receive the newest event for each sync
4081 /// response that contains an `m.beacon` event.
4082 ///
4083 /// Returns a stream of [`ObservableLiveLocation`] events from other users
4084 /// in the room, excluding the live location events of the room's own user.
4085 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
4086 ObservableLiveLocation::new(&self.client, self.room_id())
4087 }
4088
4089 /// Subscribe to knock requests in this `Room`.
4090 ///
4091 /// The current requests to join the room will be emitted immediately
4092 /// when subscribing.
4093 ///
4094 /// A new set of knock requests will be emitted whenever:
4095 /// - A new member event is received.
4096 /// - A knock request is marked as seen.
4097 /// - A sync is gappy (limited), so room membership information may be
4098 /// outdated.
4099 ///
4100 /// Returns both a stream of knock requests and a handle for a task that
4101 /// will clean up the seen knock request ids when possible.
4102 pub async fn subscribe_to_knock_requests(
4103 &self,
4104 ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
4105 let this = Arc::new(self.clone());
4106
4107 let room_member_events_observer =
4108 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
4109
4110 let current_seen_ids = self.get_seen_knock_request_ids().await?;
4111 let mut seen_request_ids_stream = self
4112 .seen_knock_request_ids_map
4113 .subscribe()
4114 .await
4115 .map(|values| values.unwrap_or_default());
4116
4117 let mut room_info_stream = self.subscribe_info();
4118
4119 // Spawn a task that will clean up the seen knock request ids when updated room
4120 // members are received
4121 let clear_seen_ids_handle = spawn({
4122 let this = self.clone();
4123 async move {
4124 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
4125 while member_updates_stream.recv().await.is_ok() {
4126 // If room members were updated, try to remove outdated seen knock request ids
4127 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
4128 warn!("Failed to remove seen knock requests: {err}")
4129 }
4130 }
4131 }
4132 });
4133
4134 let combined_stream = stream! {
4135 // Emit current requests to join
4136 match this.get_current_join_requests(¤t_seen_ids).await {
4137 Ok(initial_requests) => yield initial_requests,
4138 Err(err) => warn!("Failed to get initial requests to join: {err}")
4139 }
4140
4141 let mut requests_stream = room_member_events_observer.subscribe();
4142 let mut seen_ids = current_seen_ids.clone();
4143
4144 loop {
4145 // This is equivalent to a combine stream operation, triggering a new emission
4146 // when any of the branches changes
4147 tokio::select! {
4148 Some((event, _)) = requests_stream.next() => {
4149 if let Some(event) = event.as_original() {
4150 // If we can calculate the membership change, try to emit only when needed
4151 let emit = if event.prev_content().is_some() {
4152 matches!(event.membership_change(),
4153 MembershipChange::Banned |
4154 MembershipChange::Knocked |
4155 MembershipChange::KnockAccepted |
4156 MembershipChange::KnockDenied |
4157 MembershipChange::KnockRetracted
4158 )
4159 } else {
4160 // If we can't calculate the membership change, assume we need to
4161 // emit updated values
4162 true
4163 };
4164
4165 if emit {
4166 match this.get_current_join_requests(&seen_ids).await {
4167 Ok(requests) => yield requests,
4168 Err(err) => {
4169 warn!("Failed to get updated knock requests on new member event: {err}")
4170 }
4171 }
4172 }
4173 }
4174 }
4175
4176 Some(new_seen_ids) = seen_request_ids_stream.next() => {
4177 // Update the current seen ids
4178 seen_ids = new_seen_ids;
4179
4180 // If seen requests have changed we need to recalculate
4181 // all the knock requests
4182 match this.get_current_join_requests(&seen_ids).await {
4183 Ok(requests) => yield requests,
4184 Err(err) => {
4185 warn!("Failed to get updated knock requests on seen ids changed: {err}")
4186 }
4187 }
4188 }
4189
4190 Some(room_info) = room_info_stream.next() => {
4191 // We need to emit new items when we may have missing room members:
4192 // this usually happens after a gappy (limited) sync
4193 if !room_info.are_members_synced() {
4194 match this.get_current_join_requests(&seen_ids).await {
4195 Ok(requests) => yield requests,
4196 Err(err) => {
4197 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4198 }
4199 }
4200 }
4201 }
4202 // If the streams in all branches are closed, stop the loop
4203 else => break,
4204 }
4205 }
4206 };
4207
4208 Ok((combined_stream, clear_seen_ids_handle))
4209 }
4210
4211 async fn get_current_join_requests(
4212 &self,
4213 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4214 ) -> Result<Vec<KnockRequest>> {
4215 Ok(self
4216 .members(RoomMemberships::KNOCK)
4217 .await?
4218 .into_iter()
4219 .filter_map(|member| {
4220 let event_id = member.event().event_id()?;
4221 Some(KnockRequest::new(
4222 self,
4223 event_id,
4224 member.event().timestamp(),
4225 KnockRequestMemberInfo::from_member(&member),
4226 seen_request_ids.contains_key(event_id),
4227 ))
4228 })
4229 .collect())
4230 }
4231
4232 /// Access the room settings related to privacy and visibility.
4233 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4234 RoomPrivacySettings::new(&self.inner, &self.client)
4235 }
4236
4237 /// Retrieve a list of all the threads for the current room.
4238 ///
4239 /// Since this client-server API is paginated, the return type may include a
4240 /// token used to resuming back-pagination into the list of results, in
4241 /// [`ThreadRoots::prev_batch_token`]. This token can be fed back into
4242 /// [`ListThreadsOptions::from`] to continue the pagination
4243 /// from the previous position.
4244 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4245 let request = opts.into_request(self.room_id());
4246
4247 let response = self.client.send(request).await?;
4248
4249 let push_ctx = self.push_context().await?;
4250 let chunk = join_all(
4251 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4252 )
4253 .await;
4254
4255 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4256 }
4257
4258 /// Retrieve a list of relations for the given event, according to the given
4259 /// options, using the network.
4260 ///
4261 /// Since this client-server API is paginated, the return type may include a
4262 /// token used to resuming back-pagination into the list of results, in
4263 /// [`Relations::prev_batch_token`]. This token can be fed back into
4264 /// [`RelationsOptions::from`] to continue the pagination from the previous
4265 /// position.
4266 ///
4267 /// **Note**: if [`RelationsOptions::from`] is set for a subsequent request,
4268 /// then it must be used with the same
4269 /// [`RelationsOptions::include_relations`] value as the request that
4270 /// returns the `from` token, otherwise the server behavior is undefined.
4271 pub async fn relations(
4272 &self,
4273 event_id: OwnedEventId,
4274 opts: RelationsOptions,
4275 ) -> Result<Relations> {
4276 let relations = opts.send(self, event_id).await;
4277
4278 // Save any new related events to the cache.
4279 if let Ok(Relations { chunk, .. }) = &relations
4280 && let Ok((cache, _handles)) = self.event_cache().await
4281 {
4282 cache.save_events(chunk.clone()).await;
4283 }
4284
4285 relations
4286 }
4287
4288 /// Search this room's [`RoomIndex`] for query and return at most
4289 /// max_number_of_results results.
4290 #[cfg(feature = "experimental-search")]
4291 pub async fn search(
4292 &self,
4293 query: &str,
4294 max_number_of_results: usize,
4295 pagination_offset: Option<usize>,
4296 ) -> Result<Vec<OwnedEventId>, IndexError> {
4297 let mut search_index_guard = self.client.search_index().lock().await;
4298 search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4299 }
4300
4301 /// Subscribe to a given thread in this room.
4302 ///
4303 /// This will subscribe the user to the thread, so that they will receive
4304 /// notifications for that thread specifically.
4305 ///
4306 /// # Arguments
4307 ///
4308 /// - `thread_root`: The ID of the thread root event to subscribe to.
4309 /// - `automatic`: Whether the subscription was made automatically by a
4310 /// client, not by manual user choice. If set, must include the latest
4311 /// event ID that's known in the thread and that is causing the automatic
4312 /// subscription. If unset (i.e. we're now subscribing manually) and there
4313 /// was a previous automatic subscription, the subscription will be
4314 /// overridden to a manual one instead.
4315 ///
4316 /// # Returns
4317 ///
4318 /// - A 404 error if the event isn't known, or isn't a thread root.
4319 /// - An `Ok` result if the subscription was successful, or if the server
4320 /// skipped an automatic subscription (as the user unsubscribed from the
4321 /// thread after the event causing the automatic subscription).
4322 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4323 pub async fn subscribe_thread(
4324 &self,
4325 thread_root: OwnedEventId,
4326 automatic: Option<OwnedEventId>,
4327 ) -> Result<()> {
4328 let is_automatic = automatic.is_some();
4329
4330 match self
4331 .client
4332 .send(subscribe_thread::unstable::Request::new(
4333 self.room_id().to_owned(),
4334 thread_root.clone(),
4335 automatic,
4336 ))
4337 .await
4338 {
4339 Ok(_response) => {
4340 trace!("Server acknowledged the thread subscription; saving in db");
4341
4342 // Immediately save the result into the database.
4343 self.client
4344 .state_store()
4345 .upsert_thread_subscriptions(vec![(
4346 self.room_id(),
4347 &thread_root,
4348 StoredThreadSubscription {
4349 status: ThreadSubscriptionStatus::Subscribed {
4350 automatic: is_automatic,
4351 },
4352 bump_stamp: None,
4353 },
4354 )])
4355 .await?;
4356
4357 Ok(())
4358 }
4359
4360 Err(err) => {
4361 if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4362 // In this case: the server indicates that the user unsubscribed *after* the
4363 // event ID we've used in an automatic subscription; don't
4364 // save the subscription state in the database, as the
4365 // previous one should be more correct.
4366 trace!("Thread subscription skipped: {err}");
4367 Ok(())
4368 } else {
4369 // Forward the error to the caller.
4370 Err(err.into())
4371 }
4372 }
4373 }
4374 }
4375
4376 /// Subscribe to a thread if needed, based on a current subscription to it.
4377 ///
4378 /// This is like [`Self::subscribe_thread`], but it first checks if the user
4379 /// has already subscribed to a thread, so as to minimize sending
4380 /// unnecessary subscriptions which would be ignored by the server.
4381 pub async fn subscribe_thread_if_needed(
4382 &self,
4383 thread_root: &EventId,
4384 automatic: Option<OwnedEventId>,
4385 ) -> Result<()> {
4386 if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4387 // If we have a previous subscription, we should only send the new one if it's
4388 // manual and the previous one was automatic.
4389 if !prev_sub.automatic || automatic.is_some() {
4390 // Either we had already a manual subscription, or we had an automatic one and
4391 // the new one is automatic too: nothing to do!
4392 return Ok(());
4393 }
4394 }
4395 self.subscribe_thread(thread_root.to_owned(), automatic).await
4396 }
4397
4398 /// Unsubscribe from a given thread in this room.
4399 ///
4400 /// # Arguments
4401 ///
4402 /// - `thread_root`: The ID of the thread root event to unsubscribe to.
4403 ///
4404 /// # Returns
4405 ///
4406 /// - An `Ok` result if the unsubscription was successful, or the thread was
4407 /// already unsubscribed.
4408 /// - A 404 error if the event isn't known, or isn't a thread root.
4409 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4410 pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4411 self.client
4412 .send(unsubscribe_thread::unstable::Request::new(
4413 self.room_id().to_owned(),
4414 thread_root.clone(),
4415 ))
4416 .await?;
4417
4418 trace!("Server acknowledged the thread subscription removal; removed it from db too");
4419
4420 // Immediately save the result into the database.
4421 self.client
4422 .state_store()
4423 .upsert_thread_subscriptions(vec![(
4424 self.room_id(),
4425 &thread_root,
4426 StoredThreadSubscription {
4427 status: ThreadSubscriptionStatus::Unsubscribed,
4428 bump_stamp: None,
4429 },
4430 )])
4431 .await?;
4432
4433 Ok(())
4434 }
4435
4436 /// Return the current thread subscription for the given thread root in this
4437 /// room.
4438 ///
4439 /// # Arguments
4440 ///
4441 /// - `thread_root`: The ID of the thread root event to get the subscription
4442 /// for.
4443 ///
4444 /// # Returns
4445 ///
4446 /// - An `Ok` result with `Some(ThreadSubscription)` if we have some
4447 /// subscription information.
4448 /// - An `Ok` result with `None` if the subscription does not exist, or the
4449 /// event couldn't be found, or the event isn't a thread.
4450 /// - An error if the request fails for any other reason, such as a network
4451 /// error.
4452 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4453 pub async fn fetch_thread_subscription(
4454 &self,
4455 thread_root: OwnedEventId,
4456 ) -> Result<Option<ThreadSubscription>> {
4457 let result = self
4458 .client
4459 .send(get_thread_subscription::unstable::Request::new(
4460 self.room_id().to_owned(),
4461 thread_root.clone(),
4462 ))
4463 .await;
4464
4465 let subscription = match result {
4466 Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4467 Err(http_error) => match http_error.as_client_api_error() {
4468 Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4469 _ => return Err(http_error.into()),
4470 },
4471 };
4472
4473 // Keep the database in sync.
4474 if let Some(sub) = &subscription {
4475 self.client
4476 .state_store()
4477 .upsert_thread_subscriptions(vec![(
4478 self.room_id(),
4479 &thread_root,
4480 StoredThreadSubscription {
4481 status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4482 bump_stamp: None,
4483 },
4484 )])
4485 .await?;
4486 } else {
4487 // If the subscription was not found, remove it from the database.
4488 self.client
4489 .state_store()
4490 .remove_thread_subscription(self.room_id(), &thread_root)
4491 .await?;
4492 }
4493
4494 Ok(subscription)
4495 }
4496
4497 /// Return the current thread subscription for the given thread root in this
4498 /// room, by getting it from storage if possible, or fetching it from
4499 /// network otherwise.
4500 ///
4501 /// See also [`Self::fetch_thread_subscription`] for the exact semantics of
4502 /// this method.
4503 pub async fn load_or_fetch_thread_subscription(
4504 &self,
4505 thread_root: &EventId,
4506 ) -> Result<Option<ThreadSubscription>> {
4507 // If the thread subscriptions list is outdated, fetch from the server.
4508 if self.client.thread_subscription_catchup().is_outdated() {
4509 return self.fetch_thread_subscription(thread_root.to_owned()).await;
4510 }
4511
4512 // Otherwise, we can rely on the store information.
4513 Ok(self
4514 .client
4515 .state_store()
4516 .load_thread_subscription(self.room_id(), thread_root)
4517 .await
4518 .map(|maybe_sub| {
4519 maybe_sub.and_then(|stored| match stored.status {
4520 ThreadSubscriptionStatus::Unsubscribed => None,
4521 ThreadSubscriptionStatus::Subscribed { automatic } => {
4522 Some(ThreadSubscription { automatic })
4523 }
4524 })
4525 })?)
4526 }
4527
4528 /// Adds a new pinned event by sending an updated `m.room.pinned_events`
4529 /// event containing the new event id.
4530 ///
4531 /// This method will first try to get the pinned events from the current
4532 /// room's state and if it fails to do so it'll try to load them from the
4533 /// homeserver.
4534 ///
4535 /// Returns `true` if we pinned the event, `false` if the event was already
4536 /// pinned.
4537 pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
4538 let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4539 event_ids
4540 } else {
4541 self.load_pinned_events().await?.unwrap_or_default()
4542 };
4543 let event_id = event_id.to_owned();
4544 if pinned_event_ids.contains(&event_id) {
4545 Ok(false)
4546 } else {
4547 pinned_event_ids.push(event_id);
4548 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4549 self.send_state_event(content).await?;
4550 Ok(true)
4551 }
4552 }
4553
4554 /// Removes a pinned event by sending an updated `m.room.pinned_events`
4555 /// event without the event id we want to remove.
4556 ///
4557 /// This method will first try to get the pinned events from the current
4558 /// room's state and if it fails to do so it'll try to load them from the
4559 /// homeserver.
4560 ///
4561 /// Returns `true` if we unpinned the event, `false` if the event wasn't
4562 /// pinned before.
4563 pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
4564 let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4565 event_ids
4566 } else {
4567 self.load_pinned_events().await?.unwrap_or_default()
4568 };
4569 let event_id = event_id.to_owned();
4570 if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
4571 pinned_event_ids.remove(idx);
4572 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4573 self.send_state_event(content).await?;
4574 Ok(true)
4575 } else {
4576 Ok(false)
4577 }
4578 }
4579}
4580
4581#[cfg(feature = "e2e-encryption")]
4582impl RoomIdentityProvider for Room {
4583 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4584 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4585 }
4586
4587 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4588 Box::pin(async {
4589 let members = self
4590 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4591 .await
4592 .unwrap_or_else(|_| Default::default());
4593
4594 let mut ret: Vec<UserIdentity> = Vec::new();
4595 for member in members {
4596 if let Some(i) = self.user_identity(member.user_id()).await {
4597 ret.push(i);
4598 }
4599 }
4600 ret
4601 })
4602 }
4603
4604 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4605 Box::pin(async {
4606 self.client
4607 .encryption()
4608 .get_user_identity(user_id)
4609 .await
4610 .unwrap_or(None)
4611 .map(|u| u.underlying_identity())
4612 })
4613 }
4614}
4615
4616/// A wrapper for a weak client and a room id that allows to lazily retrieve a
4617/// room, only when needed.
4618#[derive(Clone, Debug)]
4619pub(crate) struct WeakRoom {
4620 client: WeakClient,
4621 room_id: OwnedRoomId,
4622}
4623
4624impl WeakRoom {
4625 /// Create a new `WeakRoom` given its weak components.
4626 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4627 Self { client, room_id }
4628 }
4629
4630 /// Attempts to reconstruct the room.
4631 pub fn get(&self) -> Option<Room> {
4632 self.client.get().and_then(|client| client.get_room(&self.room_id))
4633 }
4634
4635 /// The room id for that room.
4636 pub fn room_id(&self) -> &RoomId {
4637 &self.room_id
4638 }
4639}
4640
4641/// Details of the (latest) invite.
4642#[derive(Debug, Clone)]
4643pub struct Invite {
4644 /// Who has been invited.
4645 pub invitee: RoomMember,
4646
4647 /// The user ID of who sent the invite.
4648 ///
4649 /// This is useful if `Self::inviter` is `None`.
4650 pub inviter_id: OwnedUserId,
4651
4652 /// Who sent the invite.
4653 ///
4654 /// If `None`, check `Self::inviter_id`, it might be useful as a fallback.
4655 pub inviter: Option<RoomMember>,
4656}
4657
4658#[derive(Error, Debug)]
4659enum InvitationError {
4660 #[error("No membership event found")]
4661 EventMissing,
4662}
4663
4664/// Receipts to send all at once.
4665#[derive(Debug, Clone, Default)]
4666#[non_exhaustive]
4667pub struct Receipts {
4668 /// Fully-read marker (room account data).
4669 pub fully_read: Option<OwnedEventId>,
4670 /// Read receipt (public ephemeral room event).
4671 pub public_read_receipt: Option<OwnedEventId>,
4672 /// Read receipt (private ephemeral room event).
4673 pub private_read_receipt: Option<OwnedEventId>,
4674}
4675
4676impl Receipts {
4677 /// Create an empty `Receipts`.
4678 pub fn new() -> Self {
4679 Self::default()
4680 }
4681
4682 /// Set the last event the user has read.
4683 ///
4684 /// It means that the user has read all the events before this event.
4685 ///
4686 /// This is a private marker only visible by the user.
4687 ///
4688 /// Note that this is technically not a receipt as it is persisted in the
4689 /// room account data.
4690 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4691 self.fully_read = event_id.into();
4692 self
4693 }
4694
4695 /// Set the last event presented to the user and forward it to the other
4696 /// users in the room.
4697 ///
4698 /// This is used to reset the unread messages/notification count and
4699 /// advertise to other users the last event that the user has likely seen.
4700 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4701 self.public_read_receipt = event_id.into();
4702 self
4703 }
4704
4705 /// Set the last event presented to the user and don't forward it.
4706 ///
4707 /// This is used to reset the unread messages/notification count.
4708 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4709 self.private_read_receipt = event_id.into();
4710 self
4711 }
4712
4713 /// Whether this `Receipts` is empty.
4714 pub fn is_empty(&self) -> bool {
4715 self.fully_read.is_none()
4716 && self.public_read_receipt.is_none()
4717 && self.private_read_receipt.is_none()
4718 }
4719}
4720
4721/// [Parent space](https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships)
4722/// listed by a room, possibly validated by checking the space's state.
4723#[derive(Debug)]
4724pub enum ParentSpace {
4725 /// The room recognizes the given room as its parent, and the parent
4726 /// recognizes it as its child.
4727 Reciprocal(Room),
4728 /// The room recognizes the given room as its parent, but the parent does
4729 /// not recognizes it as its child. However, the author of the
4730 /// `m.space.parent` event in the room has a sufficient power level in the
4731 /// parent to create the child event.
4732 WithPowerlevel(Room),
4733 /// The room recognizes the given room as its parent, but the parent does
4734 /// not recognizes it as its child.
4735 Illegitimate(Room),
4736 /// The room recognizes the given id as its parent room, but we cannot check
4737 /// whether the parent recognizes it as its child.
4738 Unverifiable(OwnedRoomId),
4739}
4740
4741trait EventSource {
4742 fn get_event(
4743 &self,
4744 event_id: &EventId,
4745 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4746}
4747
4748impl EventSource for &Room {
4749 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4750 self.load_or_fetch_event(event_id, None).await
4751 }
4752}
4753
4754/// Contains the current user's room member info and the optional room member
4755/// info of the sender of the `m.room.member` event that this info represents.
4756#[derive(Debug)]
4757pub struct RoomMemberWithSenderInfo {
4758 /// The actual room member.
4759 pub room_member: RoomMember,
4760 /// The info of the sender of the event `room_member` is based on, if
4761 /// available.
4762 pub sender_info: Option<RoomMember>,
4763}
4764
4765#[cfg(all(test, not(target_family = "wasm")))]
4766mod tests {
4767 use std::collections::BTreeMap;
4768
4769 use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4770 use matrix_sdk_test::{
4771 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
4772 };
4773 use ruma::{
4774 RoomVersionId, event_id,
4775 events::{relation::RelationType, room::member::MembershipState},
4776 owned_event_id, room_id, user_id,
4777 };
4778 use wiremock::{
4779 Mock, MockServer, ResponseTemplate,
4780 matchers::{header, method, path_regex},
4781 };
4782
4783 use crate::{
4784 Client,
4785 config::RequestConfig,
4786 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4787 test_utils::{
4788 client::mock_matrix_session,
4789 logged_in_client,
4790 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4791 },
4792 };
4793
4794 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4795 #[async_test]
4796 async fn test_cache_invalidation_while_encrypt() {
4797 use matrix_sdk_base::store::RoomLoadSettings;
4798 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4799
4800 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4801 let session = mock_matrix_session();
4802
4803 let client = Client::builder()
4804 .homeserver_url("http://localhost:1234")
4805 .request_config(RequestConfig::new().disable_retry())
4806 .sqlite_store(&sqlite_path, None)
4807 .build()
4808 .await
4809 .unwrap();
4810 client
4811 .matrix_auth()
4812 .restore_session(session.clone(), RoomLoadSettings::default())
4813 .await
4814 .unwrap();
4815
4816 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4817
4818 // Mock receiving an event to create an internal room.
4819 let server = MockServer::start().await;
4820 {
4821 Mock::given(method("GET"))
4822 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4823 .and(header("authorization", "Bearer 1234"))
4824 .respond_with(
4825 ResponseTemplate::new(200)
4826 .set_body_json(EventFactory::new().room_encryption().into_content()),
4827 )
4828 .mount(&server)
4829 .await;
4830 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4831 let response = SyncResponseBuilder::default()
4832 .add_joined_room(
4833 JoinedRoomBuilder::default()
4834 .add_state_event(
4835 f.member(user_id!("@example:localhost")).display_name("example"),
4836 )
4837 .add_state_event(f.default_power_levels())
4838 .add_state_event(f.room_encryption()),
4839 )
4840 .build_sync_response();
4841 client.base_client().receive_sync_response(response).await.unwrap();
4842 }
4843
4844 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4845
4846 // Step 1, preshare the room keys.
4847 room.preshare_room_key().await.unwrap();
4848
4849 // Step 2, force lock invalidation by pretending another client obtained the
4850 // lock.
4851 {
4852 let client = Client::builder()
4853 .homeserver_url("http://localhost:1234")
4854 .request_config(RequestConfig::new().disable_retry())
4855 .sqlite_store(&sqlite_path, None)
4856 .build()
4857 .await
4858 .unwrap();
4859 client
4860 .matrix_auth()
4861 .restore_session(session.clone(), RoomLoadSettings::default())
4862 .await
4863 .unwrap();
4864 client
4865 .encryption()
4866 .enable_cross_process_store_lock("client2".to_owned())
4867 .await
4868 .unwrap();
4869
4870 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4871 assert!(guard.is_some());
4872 }
4873
4874 // Step 3, take the crypto-store lock.
4875 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4876 assert!(guard.is_some());
4877
4878 // Step 4, try to encrypt a message.
4879 let olm = client.olm_machine().await;
4880 let olm = olm.as_ref().expect("Olm machine wasn't started");
4881
4882 // Now pretend we're encrypting an event; the olm machine shouldn't rely on
4883 // caching the outgoing session before.
4884 let _encrypted_content = olm
4885 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4886 .await
4887 .unwrap();
4888 }
4889
4890 #[async_test]
4891 async fn test_composer_draft() {
4892 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4893
4894 let client = logged_in_client(None).await;
4895
4896 let response = SyncResponseBuilder::default()
4897 .add_joined_room(JoinedRoomBuilder::default())
4898 .build_sync_response();
4899 client.base_client().receive_sync_response(response).await.unwrap();
4900 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4901
4902 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4903
4904 // Save 2 drafts, one for the room and one for a thread.
4905
4906 let draft = ComposerDraft {
4907 plain_text: "Hello, world!".to_owned(),
4908 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4909 draft_type: ComposerDraftType::NewMessage,
4910 attachments: vec![DraftAttachment {
4911 filename: "cat.txt".to_owned(),
4912 content: matrix_sdk_base::DraftAttachmentContent::File {
4913 data: b"meow".to_vec(),
4914 mimetype: Some("text/plain".to_owned()),
4915 size: Some(5),
4916 },
4917 }],
4918 };
4919
4920 room.save_composer_draft(draft.clone(), None).await.unwrap();
4921
4922 let thread_root = owned_event_id!("$thread_root:b.c");
4923 let thread_draft = ComposerDraft {
4924 plain_text: "Hello, thread!".to_owned(),
4925 html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4926 draft_type: ComposerDraftType::NewMessage,
4927 attachments: vec![DraftAttachment {
4928 filename: "dog.txt".to_owned(),
4929 content: matrix_sdk_base::DraftAttachmentContent::File {
4930 data: b"wuv".to_vec(),
4931 mimetype: Some("text/plain".to_owned()),
4932 size: Some(4),
4933 },
4934 }],
4935 };
4936
4937 room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4938
4939 // Check that the room draft was saved correctly
4940 assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4941
4942 // Check that the thread draft was saved correctly
4943 assert_eq!(
4944 room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4945 Some(thread_draft.clone())
4946 );
4947
4948 // Clear the room draft
4949 room.clear_composer_draft(None).await.unwrap();
4950 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4951
4952 // Check that the thread one is still there
4953 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4954
4955 // Clear the thread draft as well
4956 room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4957 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4958 }
4959
4960 #[async_test]
4961 async fn test_mark_join_requests_as_seen() {
4962 let server = MatrixMockServer::new().await;
4963 let client = server.client_builder().build().await;
4964 let event_id = event_id!("$a:b.c");
4965 let room_id = room_id!("!a:b.c");
4966 let user_id = user_id!("@alice:b.c");
4967
4968 let f = EventFactory::new().room(room_id);
4969 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4970 f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
4971 ]);
4972 let room = server.sync_room(&client, joined_room_builder).await;
4973
4974 // When loading the initial seen ids, there are none
4975 let seen_ids =
4976 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4977 assert!(seen_ids.is_empty());
4978
4979 // We mark a random event id as seen
4980 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4981 .await
4982 .expect("Couldn't mark join request as seen");
4983
4984 // Then we can check it was successfully marked as seen
4985 let seen_ids =
4986 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4987 assert_eq!(seen_ids.len(), 1);
4988 assert_eq!(
4989 seen_ids.into_iter().next().expect("No next value"),
4990 (event_id.to_owned(), user_id.to_owned())
4991 )
4992 }
4993
4994 #[async_test]
4995 async fn test_own_room_membership_with_no_own_member_event() {
4996 let server = MatrixMockServer::new().await;
4997 let client = server.client_builder().build().await;
4998 let room_id = room_id!("!a:b.c");
4999
5000 let room = server.sync_joined_room(&client, room_id).await;
5001
5002 // Since there is no member event for the own user, the method fails.
5003 // This should never happen in an actual room.
5004 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
5005 assert!(error.is_some());
5006 }
5007
5008 #[async_test]
5009 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
5010 let server = MatrixMockServer::new().await;
5011 let client = server.client_builder().build().await;
5012 let room_id = room_id!("!a:b.c");
5013 let user_id = user_id!("@example:localhost");
5014
5015 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
5016 let joined_room_builder =
5017 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5018 let room = server.sync_room(&client, joined_room_builder).await;
5019
5020 // When we load the membership details
5021 let ret = room
5022 .member_with_sender_info(client.user_id().unwrap())
5023 .await
5024 .expect("Room member info should be available");
5025
5026 // We get the member info for the current user
5027 assert_eq!(ret.room_member.event().user_id(), user_id);
5028
5029 // But there is no info for the sender
5030 assert!(ret.sender_info.is_none());
5031 }
5032
5033 #[async_test]
5034 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
5035 let server = MatrixMockServer::new().await;
5036 let client = server.client_builder().build().await;
5037 let room_id = room_id!("!a:b.c");
5038 let user_id = user_id!("@example:localhost");
5039
5040 let f = EventFactory::new().room(room_id).sender(user_id);
5041 let joined_room_builder =
5042 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5043 let room = server.sync_room(&client, joined_room_builder).await;
5044
5045 // When we load the membership details
5046 let ret = room
5047 .member_with_sender_info(client.user_id().unwrap())
5048 .await
5049 .expect("Room member info should be available");
5050
5051 // We get the current user's member info
5052 assert_eq!(ret.room_member.event().user_id(), user_id);
5053
5054 // And the sender has the same info, since it's also the current user
5055 assert!(ret.sender_info.is_some());
5056 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5057 }
5058
5059 #[async_test]
5060 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5061 let server = MatrixMockServer::new().await;
5062 let client = server.client_builder().build().await;
5063 let room_id = room_id!("!a:b.c");
5064 let user_id = user_id!("@example:localhost");
5065 let sender_id = user_id!("@alice:b.c");
5066
5067 let f = EventFactory::new().room(room_id).sender(sender_id);
5068 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5069 f.member(user_id).into(),
5070 // The sender info comes from the sync
5071 f.member(sender_id).into(),
5072 ]);
5073 let room = server.sync_room(&client, joined_room_builder).await;
5074
5075 // When we load the membership details
5076 let ret = room
5077 .member_with_sender_info(client.user_id().unwrap())
5078 .await
5079 .expect("Room member info should be available");
5080
5081 // We get the current user's member info
5082 assert_eq!(ret.room_member.event().user_id(), user_id);
5083
5084 // And also the sender info from the events received in the sync
5085 assert!(ret.sender_info.is_some());
5086 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5087 }
5088
5089 #[async_test]
5090 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5091 let server = MatrixMockServer::new().await;
5092 let client = server.client_builder().build().await;
5093 let room_id = room_id!("!a:b.c");
5094 let user_id = user_id!("@example:localhost");
5095 let sender_id = user_id!("@alice:b.c");
5096
5097 let f = EventFactory::new().room(room_id).sender(sender_id);
5098 let joined_room_builder =
5099 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5100 let room = server.sync_room(&client, joined_room_builder).await;
5101
5102 // We'll receive the member info through the /members endpoint
5103 server
5104 .mock_get_members()
5105 .ok(vec![f.member(sender_id).into_raw()])
5106 .mock_once()
5107 .mount()
5108 .await;
5109
5110 // We get the current user's member info
5111 let ret = room
5112 .member_with_sender_info(client.user_id().unwrap())
5113 .await
5114 .expect("Room member info should be available");
5115
5116 // We get the current user's member info
5117 assert_eq!(ret.room_member.event().user_id(), user_id);
5118
5119 // And also the sender info from the /members endpoint
5120 assert!(ret.sender_info.is_some());
5121 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5122 }
5123
5124 #[async_test]
5125 async fn test_list_threads() {
5126 let server = MatrixMockServer::new().await;
5127 let client = server.client_builder().build().await;
5128
5129 let room_id = room_id!("!a:b.c");
5130 let sender_id = user_id!("@alice:b.c");
5131 let f = EventFactory::new().room(room_id).sender(sender_id);
5132
5133 let eid1 = event_id!("$1");
5134 let eid2 = event_id!("$2");
5135 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5136 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5137
5138 server
5139 .mock_room_threads()
5140 .ok(batch1.clone(), Some("prev_batch".to_owned()))
5141 .mock_once()
5142 .mount()
5143 .await;
5144 server
5145 .mock_room_threads()
5146 .match_from("prev_batch")
5147 .ok(batch2, None)
5148 .mock_once()
5149 .mount()
5150 .await;
5151
5152 let room = server.sync_joined_room(&client, room_id).await;
5153 let result =
5154 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5155 assert_eq!(result.chunk.len(), 1);
5156 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5157 assert!(result.prev_batch_token.is_some());
5158
5159 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5160 let result = room.list_threads(opts).await.expect("Failed to list threads");
5161 assert_eq!(result.chunk.len(), 1);
5162 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5163 assert!(result.prev_batch_token.is_none());
5164 }
5165
5166 #[async_test]
5167 async fn test_relations() {
5168 let server = MatrixMockServer::new().await;
5169 let client = server.client_builder().build().await;
5170
5171 let room_id = room_id!("!a:b.c");
5172 let sender_id = user_id!("@alice:b.c");
5173 let f = EventFactory::new().room(room_id).sender(sender_id);
5174
5175 let target_event_id = owned_event_id!("$target");
5176 let eid1 = event_id!("$1");
5177 let eid2 = event_id!("$2");
5178 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5179 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5180
5181 server
5182 .mock_room_relations()
5183 .match_target_event(target_event_id.clone())
5184 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5185 .mock_once()
5186 .mount()
5187 .await;
5188
5189 server
5190 .mock_room_relations()
5191 .match_target_event(target_event_id.clone())
5192 .match_from("next_batch")
5193 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5194 .mock_once()
5195 .mount()
5196 .await;
5197
5198 let room = server.sync_joined_room(&client, room_id).await;
5199
5200 // Main endpoint: no relation type filtered out.
5201 let mut opts = RelationsOptions {
5202 include_relations: IncludeRelations::AllRelations,
5203 ..Default::default()
5204 };
5205 let result = room
5206 .relations(target_event_id.clone(), opts.clone())
5207 .await
5208 .expect("Failed to list relations the first time");
5209 assert_eq!(result.chunk.len(), 1);
5210 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5211 assert!(result.prev_batch_token.is_none());
5212 assert!(result.next_batch_token.is_some());
5213 assert!(result.recursion_depth.is_none());
5214
5215 opts.from = result.next_batch_token;
5216 let result = room
5217 .relations(target_event_id, opts)
5218 .await
5219 .expect("Failed to list relations the second time");
5220 assert_eq!(result.chunk.len(), 1);
5221 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5222 assert!(result.prev_batch_token.is_none());
5223 assert!(result.next_batch_token.is_none());
5224 assert!(result.recursion_depth.is_none());
5225 }
5226
5227 #[async_test]
5228 async fn test_relations_with_reltype() {
5229 let server = MatrixMockServer::new().await;
5230 let client = server.client_builder().build().await;
5231
5232 let room_id = room_id!("!a:b.c");
5233 let sender_id = user_id!("@alice:b.c");
5234 let f = EventFactory::new().room(room_id).sender(sender_id);
5235
5236 let target_event_id = owned_event_id!("$target");
5237 let eid1 = event_id!("$1");
5238 let eid2 = event_id!("$2");
5239 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5240 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5241
5242 server
5243 .mock_room_relations()
5244 .match_target_event(target_event_id.clone())
5245 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5246 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5247 .mock_once()
5248 .mount()
5249 .await;
5250
5251 server
5252 .mock_room_relations()
5253 .match_target_event(target_event_id.clone())
5254 .match_from("next_batch")
5255 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5256 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5257 .mock_once()
5258 .mount()
5259 .await;
5260
5261 let room = server.sync_joined_room(&client, room_id).await;
5262
5263 // Reltype-filtered endpoint, for threads \o/
5264 let mut opts = RelationsOptions {
5265 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5266 ..Default::default()
5267 };
5268 let result = room
5269 .relations(target_event_id.clone(), opts.clone())
5270 .await
5271 .expect("Failed to list relations the first time");
5272 assert_eq!(result.chunk.len(), 1);
5273 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5274 assert!(result.prev_batch_token.is_none());
5275 assert!(result.next_batch_token.is_some());
5276 assert!(result.recursion_depth.is_none());
5277
5278 opts.from = result.next_batch_token;
5279 let result = room
5280 .relations(target_event_id, opts)
5281 .await
5282 .expect("Failed to list relations the second time");
5283 assert_eq!(result.chunk.len(), 1);
5284 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5285 assert!(result.prev_batch_token.is_none());
5286 assert!(result.next_batch_token.is_none());
5287 assert!(result.recursion_depth.is_none());
5288 }
5289
5290 #[async_test]
5291 async fn test_power_levels_computation() {
5292 let server = MatrixMockServer::new().await;
5293 let client = server.client_builder().build().await;
5294
5295 let room_id = room_id!("!a:b.c");
5296 let sender_id = client.user_id().expect("No session id");
5297 let f = EventFactory::new().room(room_id).sender(sender_id);
5298 let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5299
5300 // Computing the power levels will need these 3 state events:
5301 let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5302 let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5303 let room_member_event = f.member(sender_id).into();
5304
5305 // With only the room member event
5306 let room = server
5307 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5308 .await;
5309 let ctx = room
5310 .push_condition_room_ctx()
5311 .await
5312 .expect("Failed to get push condition context")
5313 .expect("Could not get push condition context");
5314
5315 // The internal power levels couldn't be computed
5316 assert!(ctx.power_levels.is_none());
5317
5318 // Adding the room creation event
5319 let room = server
5320 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5321 .await;
5322 let ctx = room
5323 .push_condition_room_ctx()
5324 .await
5325 .expect("Failed to get push condition context")
5326 .expect("Could not get push condition context");
5327
5328 // The internal power levels still couldn't be computed
5329 assert!(ctx.power_levels.is_none());
5330
5331 // With the room member, room creation and the power levels events
5332 let room = server
5333 .sync_room(
5334 &client,
5335 JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5336 )
5337 .await;
5338 let ctx = room
5339 .push_condition_room_ctx()
5340 .await
5341 .expect("Failed to get push condition context")
5342 .expect("Could not get push condition context");
5343
5344 // The internal power levels can finally be computed
5345 assert!(ctx.power_levels.is_some());
5346 }
5347}