matrix_sdk/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level room API
16
17use std::{
18    borrow::Borrow,
19    collections::{BTreeMap, HashMap},
20    future::Future,
21    ops::Deref,
22    sync::Arc,
23    time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30    StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39    IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43    ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
44    StateChanges, StateStoreDataKey, StateStoreDataValue,
45    deserialized_responses::{
46        RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47    },
48    media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49    store::{StateStoreExt, ThreadSubscriptionStatus},
50};
51#[cfg(feature = "e2e-encryption")]
52use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
53#[cfg(feature = "e2e-encryption")]
54use matrix_sdk_common::BoxFuture;
55use matrix_sdk_common::{
56    deserialized_responses::TimelineEvent,
57    executor::{JoinHandle, spawn},
58    timeout::timeout,
59};
60#[cfg(feature = "experimental-search")]
61use matrix_sdk_search::error::IndexError;
62#[cfg(feature = "experimental-search")]
63#[cfg(doc)]
64use matrix_sdk_search::index::RoomIndex;
65use mime::Mime;
66use reply::Reply;
67#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
68use ruma::events::AnySyncMessageLikeEvent;
69#[cfg(feature = "experimental-encrypted-state-events")]
70use ruma::events::AnySyncStateEvent;
71#[cfg(feature = "unstable-msc4274")]
72use ruma::events::room::message::GalleryItemType;
73#[cfg(feature = "e2e-encryption")]
74use ruma::events::{
75    AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
76};
77use ruma::{
78    EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
79    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
80    api::client::{
81        config::{set_global_account_data, set_room_account_data},
82        context,
83        error::ErrorKind,
84        filter::LazyLoadOptions,
85        membership::{
86            Invite3pid, ban_user, forget_room, get_member_events,
87            invite_user::{self, v3::InvitationRecipient},
88            kick_user, leave_room, unban_user,
89        },
90        message::send_message_event,
91        read_marker::set_read_marker,
92        receipt::create_receipt,
93        redact::redact_event,
94        room::{get_room_event, report_content, report_room},
95        state::{get_state_event_for_key, send_state_event},
96        tag::{create_tag, delete_tag},
97        threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
98        typing::create_typing_event::{self, v3::Typing},
99    },
100    assign,
101    events::{
102        AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
103        Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
104        RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
105        RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
106        StaticStateEventContent, SyncStateEvent,
107        beacon::BeaconEventContent,
108        beacon_info::BeaconInfoEventContent,
109        direct::DirectEventContent,
110        marked_unread::MarkedUnreadEventContent,
111        receipt::{Receipt, ReceiptThread, ReceiptType},
112        room::{
113            ImageInfo, MediaSource, ThumbnailInfo,
114            avatar::{self, RoomAvatarEventContent},
115            encryption::RoomEncryptionEventContent,
116            history_visibility::HistoryVisibility,
117            member::{MembershipChange, RoomMemberEventContent, SyncRoomMemberEvent},
118            message::{
119                AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
120                ImageMessageEventContent, MessageType, RoomMessageEventContent,
121                TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
122                UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
123            },
124            name::RoomNameEventContent,
125            pinned_events::RoomPinnedEventsEventContent,
126            power_levels::{
127                RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
128            },
129            server_acl::RoomServerAclEventContent,
130            topic::RoomTopicEventContent,
131        },
132        space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
133        tag::{TagInfo, TagName},
134        typing::SyncTypingEvent,
135    },
136    int,
137    push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
138    serde::Raw,
139    time::Instant,
140};
141#[cfg(feature = "experimental-encrypted-state-events")]
142use ruma::{
143    events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
144    serde::JsonCastable,
145};
146use serde::de::DeserializeOwned;
147use thiserror::Error;
148use tokio::{join, sync::broadcast};
149use tracing::{debug, error, info, instrument, trace, warn};
150
151use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
152pub use self::{
153    member::{RoomMember, RoomMemberRole},
154    messages::{
155        EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
156        Relations, RelationsOptions, ThreadRoots,
157    },
158};
159#[cfg(feature = "e2e-encryption")]
160use crate::encryption::backups::BackupState;
161#[cfg(doc)]
162use crate::event_cache::EventCache;
163#[cfg(feature = "experimental-encrypted-state-events")]
164use crate::room::futures::{SendRawStateEvent, SendStateEvent};
165use crate::{
166    BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
167    attachment::{AttachmentConfig, AttachmentInfo},
168    client::WeakClient,
169    config::RequestConfig,
170    error::{BeaconError, WrongRoomState},
171    event_cache::{self, EventCacheDropHandles, RoomEventCache},
172    event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
173    live_location_share::ObservableLiveLocation,
174    media::{MediaFormat, MediaRequestParameters},
175    notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
176    room::{
177        knock_requests::{KnockRequest, KnockRequestMemberInfo},
178        power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
179        privacy_settings::RoomPrivacySettings,
180    },
181    sync::RoomUpdate,
182    utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
183};
184
185pub mod edit;
186pub mod futures;
187pub mod identity_status_changes;
188/// Contains code related to requests to join a room.
189pub mod knock_requests;
190mod member;
191mod messages;
192pub mod power_levels;
193pub mod reply;
194
195pub mod calls;
196
197/// Contains all the functionality for modifying the privacy settings in a room.
198pub mod privacy_settings;
199
200#[cfg(feature = "e2e-encryption")]
201pub(crate) mod shared_room_history;
202
203/// A struct containing methods that are common for Joined, Invited and Left
204/// Rooms
205#[derive(Debug, Clone)]
206pub struct Room {
207    inner: BaseRoom,
208    pub(crate) client: Client,
209}
210
211impl Deref for Room {
212    type Target = BaseRoom;
213
214    fn deref(&self) -> &Self::Target {
215        &self.inner
216    }
217}
218
219const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
220const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
221
222/// A thread subscription, according to the semantics of MSC4306.
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224pub struct ThreadSubscription {
225    /// Whether the subscription was made automatically by a client, not by
226    /// manual user choice.
227    pub automatic: bool,
228}
229
230/// Context allowing to compute the push actions for a given event.
231#[derive(Debug)]
232pub struct PushContext {
233    /// The Ruma context used to compute the push actions.
234    push_condition_room_ctx: PushConditionRoomCtx,
235
236    /// Push rules for this room, based on the push rules state event, or the
237    /// global server default as defined by [`Ruleset::server_default`].
238    push_rules: Ruleset,
239}
240
241impl PushContext {
242    /// Create a new [`PushContext`] from its inner components.
243    pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
244        Self { push_condition_room_ctx, push_rules }
245    }
246
247    /// Compute the push rules for a given event.
248    pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
249        self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
250    }
251
252    /// Compute the push rules for a given event, with extra logging to help
253    /// debugging.
254    #[doc(hidden)]
255    #[instrument(skip_all)]
256    pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
257        let rules = self
258            .push_rules
259            .iter()
260            .filter_map(|r| {
261                if !r.enabled() {
262                    return None;
263                }
264
265                let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
266
267                let conditions = match r {
268                    AnyPushRuleRef::Override(r) => {
269                        format!("{:?}", r.conditions)
270                    }
271                    AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
272                    AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
273                    AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
274                    AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
275                    _ => "<unknown push rule kind>".to_owned(),
276                };
277
278                Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
279            })
280            .collect::<Vec<_>>()
281            .join("\n");
282        trace!("rules:\n\n{rules}\n\n");
283
284        let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
285
286        if let Some(found) = found {
287            trace!("rule {} matched", found.rule_id());
288            found.actions().to_owned()
289        } else {
290            trace!("no match");
291            Vec::new()
292        }
293    }
294}
295
296macro_rules! make_media_type {
297    ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
298        // If caption is set, use it as body, and filename as the file name; otherwise,
299        // body is the filename, and the filename is not set.
300        // https://github.com/matrix-org/matrix-spec-proposals/blob/main/proposals/2530-body-as-caption.md
301        let (body, formatted, filename) = match $caption {
302            Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
303            None => ($filename, None, None),
304        };
305
306        let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
307
308        match $content_type.type_() {
309            mime::IMAGE => {
310                let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
311                    mimetype: Some($content_type.as_ref().to_owned()),
312                    thumbnail_source,
313                    thumbnail_info
314                });
315                let content = assign!(ImageMessageEventContent::new(body, $source), {
316                    info: Some(Box::new(info)),
317                    formatted,
318                    filename
319                });
320                <$t>::Image(content)
321            }
322
323            mime::AUDIO => {
324                let mut content = assign!(AudioMessageEventContent::new(body, $source), {
325                    formatted,
326                    filename
327                });
328
329                if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
330                 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
331                    let waveform = waveform_vec
332                        .iter()
333                        .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
334                        .map(Into::into)
335                        .collect();
336                    content.audio =
337                        Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
338                }
339
340                if matches!($info, Some(AttachmentInfo::Voice(_))) {
341                    content.voice = Some(UnstableVoiceContentBlock::new());
342                }
343
344                let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
345                audio_info.mimetype = Some($content_type.as_ref().to_owned());
346                let content = content.info(Box::new(audio_info));
347
348                <$t>::Audio(content)
349            }
350
351            mime::VIDEO => {
352                let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
353                    mimetype: Some($content_type.as_ref().to_owned()),
354                    thumbnail_source,
355                    thumbnail_info
356                });
357                let content = assign!(VideoMessageEventContent::new(body, $source), {
358                    info: Some(Box::new(info)),
359                    formatted,
360                    filename
361                });
362                <$t>::Video(content)
363            }
364
365            _ => {
366                let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
367                    mimetype: Some($content_type.as_ref().to_owned()),
368                    thumbnail_source,
369                    thumbnail_info
370                });
371                let content = assign!(FileMessageEventContent::new(body, $source), {
372                    info: Some(Box::new(info)),
373                    formatted,
374                    filename,
375                });
376                <$t>::File(content)
377            }
378        }
379    }};
380}
381
382impl Room {
383    /// Create a new `Room`
384    ///
385    /// # Arguments
386    /// * `client` - The client used to make requests.
387    ///
388    /// * `room` - The underlying room.
389    pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
390        Self { inner: room, client }
391    }
392
393    /// Leave this room.
394    /// If the room was in [`RoomState::Invited`] state, it'll also be forgotten
395    /// automatically.
396    ///
397    /// Only invited and joined rooms can be left.
398    #[doc(alias = "reject_invitation")]
399    #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
400    async fn leave_impl(&self) -> (Result<()>, &Room) {
401        let state = self.state();
402        if state == RoomState::Left {
403            return (
404                Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
405                    "Joined or Invited",
406                    state,
407                )))),
408                self,
409            );
410        }
411
412        // If the room was in Invited state we should also forget it when declining the
413        // invite.
414        let should_forget = matches!(self.state(), RoomState::Invited);
415
416        let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
417        let response = self.client.send(request).await;
418
419        // The server can return with an error that is acceptable to ignore. Let's find
420        // which one.
421        if let Err(error) = response {
422            #[allow(clippy::collapsible_match)]
423            let ignore_error = if let Some(error) = error.client_api_error_kind() {
424                match error {
425                    // The user is trying to leave a room but doesn't have permissions to do so.
426                    // Let's consider the user has left the room.
427                    ErrorKind::Forbidden { .. } => true,
428                    _ => false,
429                }
430            } else {
431                false
432            };
433
434            error!(?error, ignore_error, should_forget, "Failed to leave the room");
435
436            if !ignore_error {
437                return (Err(error.into()), self);
438            }
439        }
440
441        if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
442            return (Err(e.into()), self);
443        }
444
445        if should_forget {
446            trace!("Trying to forget the room");
447
448            if let Err(error) = self.forget().await {
449                error!(?error, "Failed to forget the room");
450            }
451        }
452
453        (Ok(()), self)
454    }
455
456    /// Leave this room and all predecessors.
457    /// If any room was in [`RoomState::Invited`] state, it'll also be forgotten
458    /// automatically.
459    ///
460    /// Only invited and joined rooms can be left.
461    /// Will return an error if the current room fails to leave but
462    /// will only warn if a predecessor fails to leave.
463    pub async fn leave(&self) -> Result<()> {
464        let mut rooms: Vec<Room> = vec![self.clone()];
465        let mut current_room = self;
466
467        while let Some(predecessor) = current_room.predecessor_room() {
468            let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
469
470            if let Some(predecessor_room) = maybe_predecessor_room {
471                rooms.push(predecessor_room.clone());
472                current_room = rooms.last().expect("Room just pushed so can't be empty");
473            } else {
474                warn!("Cannot find predecessor room");
475                break;
476            }
477        }
478
479        let batch_size = 5;
480
481        let rooms_futures: Vec<_> = rooms
482            .iter()
483            .filter_map(|room| match room.state() {
484                RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
485                    Some(room.leave_impl())
486                }
487                RoomState::Banned | RoomState::Left => None,
488            })
489            .collect();
490
491        let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
492
493        let mut maybe_this_room_failed_with: Option<Error> = None;
494
495        while let Some(result) = futures_stream.next().await {
496            if let (Err(e), room) = result {
497                if room.room_id() == self.room_id() {
498                    maybe_this_room_failed_with = Some(e);
499                } else {
500                    warn!("Failure while attempting to leave predecessor room: {e:?}");
501                }
502            }
503        }
504
505        maybe_this_room_failed_with.map_or(Ok(()), Err)
506    }
507
508    /// Join this room.
509    ///
510    /// Only invited and left rooms can be joined via this method.
511    #[doc(alias = "accept_invitation")]
512    pub async fn join(&self) -> Result<()> {
513        let prev_room_state = self.inner.state();
514
515        if prev_room_state == RoomState::Joined {
516            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
517                "Invited or Left",
518                prev_room_state,
519            ))));
520        }
521
522        self.client.join_room_by_id(self.room_id()).await?;
523
524        Ok(())
525    }
526
527    /// Get the inner client saved in this room instance.
528    ///
529    /// Returns the client this room is part of.
530    pub fn client(&self) -> Client {
531        self.client.clone()
532    }
533
534    /// Get the sync state of this room, i.e. whether it was fully synced with
535    /// the server.
536    pub fn is_synced(&self) -> bool {
537        self.inner.is_state_fully_synced()
538    }
539
540    /// Gets the avatar of this room, if set.
541    ///
542    /// Returns the avatar.
543    /// If a thumbnail is requested no guarantee on the size of the image is
544    /// given.
545    ///
546    /// # Arguments
547    ///
548    /// * `format` - The desired format of the avatar.
549    ///
550    /// # Examples
551    ///
552    /// ```no_run
553    /// # use matrix_sdk::Client;
554    /// # use matrix_sdk::ruma::room_id;
555    /// # use matrix_sdk::media::MediaFormat;
556    /// # use url::Url;
557    /// # let homeserver = Url::parse("http://example.com").unwrap();
558    /// # async {
559    /// # let user = "example";
560    /// let client = Client::new(homeserver).await.unwrap();
561    /// client.matrix_auth().login_username(user, "password").send().await.unwrap();
562    /// let room_id = room_id!("!roomid:example.com");
563    /// let room = client.get_room(&room_id).unwrap();
564    /// if let Some(avatar) = room.avatar(MediaFormat::File).await.unwrap() {
565    ///     std::fs::write("avatar.png", avatar);
566    /// }
567    /// # };
568    /// ```
569    pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
570        let Some(url) = self.avatar_url() else { return Ok(None) };
571        let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
572        Ok(Some(self.client.media().get_media_content(&request, true).await?))
573    }
574
575    /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
576    /// returns a `Messages` struct that contains a chunk of room and state
577    /// events (`RoomEvent` and `AnyStateEvent`).
578    ///
579    /// With the encryption feature, messages are decrypted if possible. If
580    /// decryption fails for an individual message, that message is returned
581    /// undecrypted.
582    ///
583    /// # Examples
584    ///
585    /// ```no_run
586    /// use matrix_sdk::{Client, room::MessagesOptions};
587    /// # use matrix_sdk::ruma::{
588    /// #     api::client::filter::RoomEventFilter,
589    /// #     room_id,
590    /// # };
591    /// # use url::Url;
592    ///
593    /// # let homeserver = Url::parse("http://example.com").unwrap();
594    /// # async {
595    /// let options =
596    ///     MessagesOptions::backward().from("t47429-4392820_219380_26003_2265");
597    ///
598    /// let mut client = Client::new(homeserver).await.unwrap();
599    /// let room = client.get_room(room_id!("!roomid:example.com")).unwrap();
600    /// assert!(room.messages(options).await.is_ok());
601    /// # };
602    /// ```
603    #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
604    pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
605        let room_id = self.inner.room_id();
606        let request = options.into_request(room_id);
607        let http_response = self.client.send(request).await?;
608
609        let push_ctx = self.push_context().await?;
610        let chunk = join_all(
611            http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
612        )
613        .await;
614
615        Ok(Messages {
616            start: http_response.start,
617            end: http_response.end,
618            chunk,
619            state: http_response.state,
620        })
621    }
622
623    /// Register a handler for events of a specific type, within this room.
624    ///
625    /// This method works the same way as [`Client::add_event_handler`], except
626    /// that the handler will only be called for events within this room. See
627    /// that method for more details on event handler functions.
628    ///
629    /// `room.add_event_handler(hdl)` is equivalent to
630    /// `client.add_room_event_handler(room_id, hdl)`. Use whichever one is more
631    /// convenient in your use case.
632    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
633    where
634        Ev: SyncEvent + DeserializeOwned + Send + 'static,
635        H: EventHandler<Ev, Ctx>,
636    {
637        self.client.add_room_event_handler(self.room_id(), handler)
638    }
639
640    /// Subscribe to all updates for this room.
641    ///
642    /// The returned receiver will receive a new message for each sync response
643    /// that contains updates for this room.
644    pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
645        self.client.subscribe_to_room_updates(self.room_id())
646    }
647
648    /// Subscribe to typing notifications for this room.
649    ///
650    /// The returned receiver will receive a new vector of user IDs for each
651    /// sync response that contains 'm.typing' event. The current user ID will
652    /// be filtered out.
653    pub fn subscribe_to_typing_notifications(
654        &self,
655    ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
656        let (sender, receiver) = broadcast::channel(16);
657        let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
658            let own_user_id = self.own_user_id().to_owned();
659            move |event: SyncTypingEvent| async move {
660                // Ignore typing notifications from own user.
661                let typing_user_ids = event
662                    .content
663                    .user_ids
664                    .into_iter()
665                    .filter(|user_id| *user_id != own_user_id)
666                    .collect();
667                // Ignore the result. It can only fail if there are no listeners.
668                let _ = sender.send(typing_user_ids);
669            }
670        });
671        let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
672        (drop_guard, receiver)
673    }
674
675    /// Subscribe to updates about users who are in "pin violation" i.e. their
676    /// identity has changed and the user has not yet acknowledged this.
677    ///
678    /// The returned receiver will receive a new vector of
679    /// [`IdentityStatusChange`] each time a /keys/query response shows a
680    /// changed identity for a member of this room, or a sync shows a change
681    /// to the membership of an affected user. (Changes to the current user are
682    /// not directly included, but some changes to the current user's identity
683    /// can trigger changes to how we see other users' identities, which
684    /// will be included.)
685    ///
686    /// The first item in the stream provides the current state of the room:
687    /// each member of the room who is not in "pinned" or "verified" state will
688    /// be included (except the current user).
689    ///
690    /// If the `changed_to` property of an [`IdentityStatusChange`] is set to
691    /// `PinViolation` then a warning should be displayed to the user. If it is
692    /// set to `Pinned` then no warning should be displayed.
693    ///
694    /// Note that if a user who is in pin violation leaves the room, a `Pinned`
695    /// update is sent, to indicate that the warning should be removed, even
696    /// though the user's identity is not necessarily pinned.
697    #[cfg(feature = "e2e-encryption")]
698    pub async fn subscribe_to_identity_status_changes(
699        &self,
700    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
701        IdentityStatusChanges::create_stream(self.clone()).await
702    }
703
704    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
705    /// decrypted if needs be.
706    ///
707    /// Only logs from the crypto crate will indicate a failure to decrypt.
708    #[cfg(not(feature = "experimental-encrypted-state-events"))]
709    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
710    async fn try_decrypt_event(
711        &self,
712        event: Raw<AnyTimelineEvent>,
713        push_ctx: Option<&PushContext>,
714    ) -> TimelineEvent {
715        #[cfg(feature = "e2e-encryption")]
716        if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
717            SyncMessageLikeEvent::Original(_),
718        ))) = event.deserialize_as::<AnySyncTimelineEvent>()
719            && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
720        {
721            return event;
722        }
723
724        let mut event = TimelineEvent::from_plaintext(event.cast());
725        if let Some(push_ctx) = push_ctx {
726            event.set_push_actions(push_ctx.for_event(event.raw()).await);
727        }
728
729        event
730    }
731
732    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
733    /// decrypted if needs be.
734    ///
735    /// Only logs from the crypto crate will indicate a failure to decrypt.
736    #[cfg(feature = "experimental-encrypted-state-events")]
737    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
738    async fn try_decrypt_event(
739        &self,
740        event: Raw<AnyTimelineEvent>,
741        push_ctx: Option<&PushContext>,
742    ) -> TimelineEvent {
743        // If we have either an encrypted message-like or state event, try to decrypt.
744        match event.deserialize_as::<AnySyncTimelineEvent>() {
745            Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
746                SyncMessageLikeEvent::Original(_),
747            ))) => {
748                if let Ok(event) = self
749                    .decrypt_event(
750                        event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
751                        push_ctx,
752                    )
753                    .await
754                {
755                    return event;
756                }
757            }
758            Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
759                SyncStateEvent::Original(_),
760            ))) => {
761                if let Ok(event) = self
762                    .decrypt_event(
763                        event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
764                        push_ctx,
765                    )
766                    .await
767                {
768                    return event;
769                }
770            }
771            _ => {}
772        }
773
774        let mut event = TimelineEvent::from_plaintext(event.cast());
775        if let Some(push_ctx) = push_ctx {
776            event.set_push_actions(push_ctx.for_event(event.raw()).await);
777        }
778
779        event
780    }
781
782    /// Fetch the event with the given `EventId` in this room.
783    ///
784    /// It uses the given [`RequestConfig`] if provided, or the client's default
785    /// one otherwise.
786    pub async fn event(
787        &self,
788        event_id: &EventId,
789        request_config: Option<RequestConfig>,
790    ) -> Result<TimelineEvent> {
791        let request =
792            get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
793
794        let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
795        let push_ctx = self.push_context().await?;
796        let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
797
798        // Save the event into the event cache, if it's set up.
799        if let Ok((cache, _handles)) = self.event_cache().await {
800            cache.save_events([event.clone()]).await;
801        }
802
803        Ok(event)
804    }
805
806    /// Try to load the event from the [`EventCache`][crate::event_cache], if
807    /// it's enabled, or fetch it from the homeserver.
808    ///
809    /// When running the request against the homeserver, it uses the given
810    /// [`RequestConfig`] if provided, or the client's default one
811    /// otherwise.
812    pub async fn load_or_fetch_event(
813        &self,
814        event_id: &EventId,
815        request_config: Option<RequestConfig>,
816    ) -> Result<TimelineEvent> {
817        match self.event_cache().await {
818            Ok((event_cache, _drop_handles)) => {
819                if let Some(event) = event_cache.find_event(event_id).await? {
820                    return Ok(event);
821                }
822                // Fallthrough: try with a request.
823            }
824            Err(err) => {
825                debug!("error when getting the event cache: {err}");
826            }
827        }
828        self.event(event_id, request_config).await
829    }
830
831    /// Fetch the event with the given `EventId` in this room, using the
832    /// `/context` endpoint to get more information.
833    pub async fn event_with_context(
834        &self,
835        event_id: &EventId,
836        lazy_load_members: bool,
837        context_size: UInt,
838        request_config: Option<RequestConfig>,
839    ) -> Result<EventWithContextResponse> {
840        let mut request =
841            context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
842
843        request.limit = context_size;
844
845        if lazy_load_members {
846            request.filter.lazy_load_options =
847                LazyLoadOptions::Enabled { include_redundant_members: false };
848        }
849
850        let response = self.client.send(request).with_request_config(request_config).await?;
851
852        let push_ctx = self.push_context().await?;
853        let push_ctx = push_ctx.as_ref();
854        let target_event = if let Some(event) = response.event {
855            Some(self.try_decrypt_event(event, push_ctx).await)
856        } else {
857            None
858        };
859
860        // Note: the joined future will fail if any future failed, but
861        // [`Self::try_decrypt_event`] doesn't hard-fail when there's a
862        // decryption error, so we should prevent against most bad cases here.
863        let (events_before, events_after) = join!(
864            join_all(
865                response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
866            ),
867            join_all(
868                response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
869            ),
870        );
871
872        // Save the loaded events into the event cache, if it's set up.
873        if let Ok((cache, _handles)) = self.event_cache().await {
874            let mut events_to_save: Vec<TimelineEvent> = Vec::new();
875            if let Some(event) = &target_event {
876                events_to_save.push(event.clone());
877            }
878
879            for event in &events_before {
880                events_to_save.push(event.clone());
881            }
882
883            for event in &events_after {
884                events_to_save.push(event.clone());
885            }
886
887            cache.save_events(events_to_save).await;
888        }
889
890        Ok(EventWithContextResponse {
891            event: target_event,
892            events_before,
893            events_after,
894            state: response.state,
895            prev_batch_token: response.start,
896            next_batch_token: response.end,
897        })
898    }
899
900    pub(crate) async fn request_members(&self) -> Result<()> {
901        self.client
902            .locks()
903            .members_request_deduplicated_handler
904            .run(self.room_id().to_owned(), async move {
905                let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
906                let response = self
907                    .client
908                    .send(request.clone())
909                    .with_request_config(
910                        // In some cases it can take longer than 30s to load:
911                        // https://github.com/element-hq/synapse/issues/16872
912                        RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
913                    )
914                    .await?;
915
916                // That's a large `Future`. Let's `Box::pin` to reduce its size on the stack.
917                Box::pin(self.client.base_client().receive_all_members(
918                    self.room_id(),
919                    &request,
920                    &response,
921                ))
922                .await?;
923
924                Ok(())
925            })
926            .await
927    }
928
929    /// Request to update the encryption state for this room.
930    ///
931    /// It does nothing if the encryption state is already
932    /// [`EncryptionState::Encrypted`] or [`EncryptionState::NotEncrypted`].
933    pub async fn request_encryption_state(&self) -> Result<()> {
934        if !self.inner.encryption_state().is_unknown() {
935            return Ok(());
936        }
937
938        self.client
939            .locks()
940            .encryption_state_deduplicated_handler
941            .run(self.room_id().to_owned(), async move {
942                // Request the event from the server.
943                let request = get_state_event_for_key::v3::Request::new(
944                    self.room_id().to_owned(),
945                    StateEventType::RoomEncryption,
946                    "".to_owned(),
947                );
948                let response = match self.client.send(request).await {
949                    Ok(response) => Some(
950                        response
951                            .into_content()
952                            .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
953                    ),
954                    Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
955                    Err(err) => return Err(err.into()),
956                };
957
958                let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
959
960                // Persist the event and the fact that we requested it from the server in
961                // `RoomInfo`.
962                let mut room_info = self.clone_info();
963                room_info.mark_encryption_state_synced();
964                room_info.set_encryption_event(response.clone());
965                let mut changes = StateChanges::default();
966                changes.add_room(room_info.clone());
967
968                self.client.state_store().save_changes(&changes).await?;
969                self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
970
971                Ok(())
972            })
973            .await
974    }
975
976    /// Check the encryption state of this room.
977    ///
978    /// If the result is [`EncryptionState::Unknown`], one might want to call
979    /// [`Room::request_encryption_state`].
980    pub fn encryption_state(&self) -> EncryptionState {
981        self.inner.encryption_state()
982    }
983
984    /// Force to update the encryption state by calling
985    /// [`Room::request_encryption_state`], and then calling
986    /// [`Room::encryption_state`].
987    ///
988    /// This method is useful to ensure the encryption state is up-to-date.
989    pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
990        self.request_encryption_state().await?;
991
992        Ok(self.encryption_state())
993    }
994
995    /// Gets additional context info about the client crypto.
996    #[cfg(feature = "e2e-encryption")]
997    pub async fn crypto_context_info(&self) -> CryptoContextInfo {
998        let encryption = self.client.encryption();
999
1000        let this_device_is_verified = match encryption.get_own_device().await {
1001            Ok(Some(device)) => device.is_verified_with_cross_signing(),
1002
1003            // Should not happen, there will always be an own device
1004            _ => true,
1005        };
1006
1007        let backup_exists_on_server =
1008            encryption.backups().exists_on_server().await.unwrap_or(false);
1009
1010        CryptoContextInfo {
1011            device_creation_ts: encryption.device_creation_timestamp().await,
1012            this_device_is_verified,
1013            is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1014            backup_exists_on_server,
1015        }
1016    }
1017
1018    fn are_events_visible(&self) -> bool {
1019        if let RoomState::Invited = self.inner.state() {
1020            return matches!(
1021                self.inner.history_visibility_or_default(),
1022                HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1023            );
1024        }
1025
1026        true
1027    }
1028
1029    /// Sync the member list with the server.
1030    ///
1031    /// This method will de-duplicate requests if it is called multiple times in
1032    /// quick succession, in that case the return value will be `None`. This
1033    /// method does nothing if the members are already synced.
1034    pub async fn sync_members(&self) -> Result<()> {
1035        if !self.are_events_visible() {
1036            return Ok(());
1037        }
1038
1039        if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1040    }
1041
1042    /// Get a specific member of this room.
1043    ///
1044    /// *Note*: This method will fetch the members from the homeserver if the
1045    /// member list isn't synchronized due to member lazy loading. Because of
1046    /// that it might panic if it isn't run on a tokio thread.
1047    ///
1048    /// Use [get_member_no_sync()](#method.get_member_no_sync) if you want a
1049    /// method that doesn't do any requests.
1050    ///
1051    /// # Arguments
1052    ///
1053    /// * `user_id` - The ID of the user that should be fetched out of the
1054    ///   store.
1055    pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1056        self.sync_members().await?;
1057        self.get_member_no_sync(user_id).await
1058    }
1059
1060    /// Get a specific member of this room.
1061    ///
1062    /// *Note*: This method will not fetch the members from the homeserver if
1063    /// the member list isn't synchronized due to member lazy loading. Thus,
1064    /// members could be missing.
1065    ///
1066    /// Use [get_member()](#method.get_member) if you want to ensure to always
1067    /// have the full member list to chose from.
1068    ///
1069    /// # Arguments
1070    ///
1071    /// * `user_id` - The ID of the user that should be fetched out of the
1072    ///   store.
1073    pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1074        Ok(self
1075            .inner
1076            .get_member(user_id)
1077            .await?
1078            .map(|member| RoomMember::new(self.client.clone(), member)))
1079    }
1080
1081    /// Get members for this room, with the given memberships.
1082    ///
1083    /// *Note*: This method will fetch the members from the homeserver if the
1084    /// member list isn't synchronized due to member lazy loading. Because of
1085    /// that it might panic if it isn't run on a tokio thread.
1086    ///
1087    /// Use [members_no_sync()](#method.members_no_sync) if you want a
1088    /// method that doesn't do any requests.
1089    pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1090        self.sync_members().await?;
1091        self.members_no_sync(memberships).await
1092    }
1093
1094    /// Get members for this room, with the given memberships.
1095    ///
1096    /// *Note*: This method will not fetch the members from the homeserver if
1097    /// the member list isn't synchronized due to member lazy loading. Thus,
1098    /// members could be missing.
1099    ///
1100    /// Use [members()](#method.members) if you want to ensure to always get
1101    /// the full member list.
1102    pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1103        Ok(self
1104            .inner
1105            .members(memberships)
1106            .await?
1107            .into_iter()
1108            .map(|member| RoomMember::new(self.client.clone(), member))
1109            .collect())
1110    }
1111
1112    /// Sets the display name of the current user within this room.
1113    ///
1114    /// *Note*: This is different to [`crate::Account::set_display_name`] which
1115    /// updates the user's display name across all of their rooms.
1116    pub async fn set_own_member_display_name(
1117        &self,
1118        display_name: Option<String>,
1119    ) -> Result<send_state_event::v3::Response> {
1120        let user_id = self.own_user_id();
1121        let member_event =
1122            self.get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id).await?;
1123
1124        let Some(RawSyncOrStrippedState::Sync(raw_event)) = member_event else {
1125            return Err(Error::InsufficientData);
1126        };
1127
1128        let event = raw_event.deserialize()?;
1129
1130        let mut content = match event {
1131            SyncStateEvent::Original(original_event) => original_event.content,
1132            SyncStateEvent::Redacted(redacted_event) => {
1133                RoomMemberEventContent::new(redacted_event.content.membership)
1134            }
1135        };
1136
1137        content.displayname = display_name;
1138        self.send_state_event_for_key(user_id, content).await
1139    }
1140
1141    /// Get all state events of a given type in this room.
1142    pub async fn get_state_events(
1143        &self,
1144        event_type: StateEventType,
1145    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1146        self.client
1147            .state_store()
1148            .get_state_events(self.room_id(), event_type)
1149            .await
1150            .map_err(Into::into)
1151    }
1152
1153    /// Get all state events of a given statically-known type in this room.
1154    ///
1155    /// # Examples
1156    ///
1157    /// ```no_run
1158    /// # async {
1159    /// # let room: matrix_sdk::Room = todo!();
1160    /// use matrix_sdk::ruma::{
1161    ///     events::room::member::RoomMemberEventContent, serde::Raw,
1162    /// };
1163    ///
1164    /// let room_members =
1165    ///     room.get_state_events_static::<RoomMemberEventContent>().await?;
1166    /// # anyhow::Ok(())
1167    /// # };
1168    /// ```
1169    pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1170    where
1171        C: StaticEventContent<IsPrefix = ruma::events::False>
1172            + StaticStateEventContent
1173            + RedactContent,
1174        C::Redacted: RedactedStateEventContent,
1175    {
1176        Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1177    }
1178
1179    /// Get the state events of a given type with the given state keys in this
1180    /// room.
1181    pub async fn get_state_events_for_keys(
1182        &self,
1183        event_type: StateEventType,
1184        state_keys: &[&str],
1185    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1186        self.client
1187            .state_store()
1188            .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1189            .await
1190            .map_err(Into::into)
1191    }
1192
1193    /// Get the state events of a given statically-known type with the given
1194    /// state keys in this room.
1195    ///
1196    /// # Examples
1197    ///
1198    /// ```no_run
1199    /// # async {
1200    /// # let room: matrix_sdk::Room = todo!();
1201    /// # let user_ids: &[matrix_sdk::ruma::OwnedUserId] = &[];
1202    /// use matrix_sdk::ruma::events::room::member::RoomMemberEventContent;
1203    ///
1204    /// let room_members = room
1205    ///     .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
1206    ///         user_ids,
1207    ///     )
1208    ///     .await?;
1209    /// # anyhow::Ok(())
1210    /// # };
1211    /// ```
1212    pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1213        &self,
1214        state_keys: I,
1215    ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1216    where
1217        C: StaticEventContent<IsPrefix = ruma::events::False>
1218            + StaticStateEventContent
1219            + RedactContent,
1220        C::StateKey: Borrow<K>,
1221        C::Redacted: RedactedStateEventContent,
1222        K: AsRef<str> + Sized + Sync + 'a,
1223        I: IntoIterator<Item = &'a K> + Send,
1224        I::IntoIter: Send,
1225    {
1226        Ok(self
1227            .client
1228            .state_store()
1229            .get_state_events_for_keys_static(self.room_id(), state_keys)
1230            .await?)
1231    }
1232
1233    /// Get a specific state event in this room.
1234    pub async fn get_state_event(
1235        &self,
1236        event_type: StateEventType,
1237        state_key: &str,
1238    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1239        self.client
1240            .state_store()
1241            .get_state_event(self.room_id(), event_type, state_key)
1242            .await
1243            .map_err(Into::into)
1244    }
1245
1246    /// Get a specific state event of statically-known type with an empty state
1247    /// key in this room.
1248    ///
1249    /// # Examples
1250    ///
1251    /// ```no_run
1252    /// # async {
1253    /// # let room: matrix_sdk::Room = todo!();
1254    /// use matrix_sdk::ruma::events::room::power_levels::RoomPowerLevelsEventContent;
1255    ///
1256    /// let power_levels = room
1257    ///     .get_state_event_static::<RoomPowerLevelsEventContent>()
1258    ///     .await?
1259    ///     .expect("every room has a power_levels event")
1260    ///     .deserialize()?;
1261    /// # anyhow::Ok(())
1262    /// # };
1263    /// ```
1264    pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1265    where
1266        C: StaticEventContent<IsPrefix = ruma::events::False>
1267            + StaticStateEventContent<StateKey = EmptyStateKey>
1268            + RedactContent,
1269        C::Redacted: RedactedStateEventContent,
1270    {
1271        self.get_state_event_static_for_key(&EmptyStateKey).await
1272    }
1273
1274    /// Get a specific state event of statically-known type in this room.
1275    ///
1276    /// # Examples
1277    ///
1278    /// ```no_run
1279    /// # async {
1280    /// # let room: matrix_sdk::Room = todo!();
1281    /// use matrix_sdk::ruma::{
1282    ///     events::room::member::RoomMemberEventContent, serde::Raw, user_id,
1283    /// };
1284    ///
1285    /// let member_event = room
1286    ///     .get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id!(
1287    ///         "@alice:example.org"
1288    ///     ))
1289    ///     .await?;
1290    /// # anyhow::Ok(())
1291    /// # };
1292    /// ```
1293    pub async fn get_state_event_static_for_key<C, K>(
1294        &self,
1295        state_key: &K,
1296    ) -> Result<Option<RawSyncOrStrippedState<C>>>
1297    where
1298        C: StaticEventContent<IsPrefix = ruma::events::False>
1299            + StaticStateEventContent
1300            + RedactContent,
1301        C::StateKey: Borrow<K>,
1302        C::Redacted: RedactedStateEventContent,
1303        K: AsRef<str> + ?Sized + Sync,
1304    {
1305        Ok(self
1306            .client
1307            .state_store()
1308            .get_state_event_static_for_key(self.room_id(), state_key)
1309            .await?)
1310    }
1311
1312    /// Returns the parents this room advertises as its parents.
1313    ///
1314    /// Results are in no particular order.
1315    pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1316        // Implements this algorithm:
1317        // https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships
1318
1319        // Get all m.space.parent events for this room
1320        Ok(self
1321            .get_state_events_static::<SpaceParentEventContent>()
1322            .await?
1323            .into_iter()
1324            // Extract state key (ie. the parent's id) and sender
1325            .filter_map(|parent_event| match parent_event.deserialize() {
1326                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1327                    Some((e.state_key.to_owned(), e.sender))
1328                }
1329                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1330                Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1331                Err(e) => {
1332                    info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1333                    None
1334                }
1335            })
1336            // Check whether the parent recognizes this room as its child
1337            .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1338                let Some(parent_room) = self.client.get_room(&state_key) else {
1339                    // We are not in the room, cannot check if the relationship is reciprocal
1340                    // TODO: try peeking into the room
1341                    return Ok(ParentSpace::Unverifiable(state_key));
1342                };
1343                // Get the m.space.child state of the parent with this room's id
1344                // as state key.
1345                if let Some(child_event) = parent_room
1346                    .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1347                    .await?
1348                {
1349                    match child_event.deserialize() {
1350                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1351                            // There is a valid m.space.child in the parent pointing to
1352                            // this room
1353                            return Ok(ParentSpace::Reciprocal(parent_room));
1354                        }
1355                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1356                        Ok(SyncOrStrippedState::Stripped(_)) => {}
1357                        Err(e) => {
1358                            info!(
1359                                room_id = ?self.room_id(), parent_room_id = ?state_key,
1360                                "Could not deserialize m.space.child: {e}"
1361                            );
1362                        }
1363                    }
1364                    // Otherwise the event is either invalid or redacted. If
1365                    // redacted it would be missing the
1366                    // `via` key, thereby invalidating that end of the
1367                    // relationship: https://spec.matrix.org/v1.8/client-server-api/#mspacechild
1368                }
1369
1370                // No reciprocal m.space.child found, let's check if the sender has the
1371                // power to set it
1372                let Some(member) = parent_room.get_member(&sender).await? else {
1373                    // Sender is not even in the parent room
1374                    return Ok(ParentSpace::Illegitimate(parent_room));
1375                };
1376
1377                if member.can_send_state(StateEventType::SpaceChild) {
1378                    // Sender does have the power to set m.room.child
1379                    Ok(ParentSpace::WithPowerlevel(parent_room))
1380                } else {
1381                    Ok(ParentSpace::Illegitimate(parent_room))
1382                }
1383            })
1384            .collect::<FuturesUnordered<_>>())
1385    }
1386
1387    /// Read account data in this room, from storage.
1388    pub async fn account_data(
1389        &self,
1390        data_type: RoomAccountDataEventType,
1391    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1392        self.client
1393            .state_store()
1394            .get_room_account_data_event(self.room_id(), data_type)
1395            .await
1396            .map_err(Into::into)
1397    }
1398
1399    /// Get account data of a statically-known type in this room, from storage.
1400    ///
1401    /// # Examples
1402    ///
1403    /// ```no_run
1404    /// # async {
1405    /// # let room: matrix_sdk::Room = todo!();
1406    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1407    ///
1408    /// match room.account_data_static::<FullyReadEventContent>().await? {
1409    ///     Some(fully_read) => {
1410    ///         println!("Found read marker: {:?}", fully_read.deserialize()?)
1411    ///     }
1412    ///     None => println!("No read marker for this room"),
1413    /// }
1414    /// # anyhow::Ok(())
1415    /// # };
1416    /// ```
1417    pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1418    where
1419        C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1420    {
1421        Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1422    }
1423
1424    /// Check if all members of this room are verified and all their devices are
1425    /// verified.
1426    ///
1427    /// Returns true if all devices in the room are verified, otherwise false.
1428    #[cfg(feature = "e2e-encryption")]
1429    pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1430        let user_ids = self
1431            .client
1432            .state_store()
1433            .get_user_ids(self.room_id(), RoomMemberships::empty())
1434            .await?;
1435
1436        for user_id in user_ids {
1437            let devices = self.client.encryption().get_user_devices(&user_id).await?;
1438            let any_unverified = devices.devices().any(|d| !d.is_verified());
1439
1440            if any_unverified {
1441                return Ok(false);
1442            }
1443        }
1444
1445        Ok(true)
1446    }
1447
1448    /// Set the given account data event for this room.
1449    ///
1450    /// # Example
1451    /// ```
1452    /// # async {
1453    /// # let room: matrix_sdk::Room = todo!();
1454    /// # let event_id: ruma::OwnedEventId = todo!();
1455    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1456    /// let content = FullyReadEventContent::new(event_id);
1457    ///
1458    /// room.set_account_data(content).await?;
1459    /// # anyhow::Ok(())
1460    /// # };
1461    /// ```
1462    pub async fn set_account_data<T>(
1463        &self,
1464        content: T,
1465    ) -> Result<set_room_account_data::v3::Response>
1466    where
1467        T: RoomAccountDataEventContent,
1468    {
1469        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1470
1471        let request = set_room_account_data::v3::Request::new(
1472            own_user.to_owned(),
1473            self.room_id().to_owned(),
1474            &content,
1475        )?;
1476
1477        Ok(self.client.send(request).await?)
1478    }
1479
1480    /// Set the given raw account data event in this room.
1481    ///
1482    /// # Example
1483    /// ```
1484    /// # async {
1485    /// # let room: matrix_sdk::Room = todo!();
1486    /// use matrix_sdk::ruma::{
1487    ///     events::{
1488    ///         AnyRoomAccountDataEventContent, RoomAccountDataEventContent,
1489    ///         marked_unread::MarkedUnreadEventContent,
1490    ///     },
1491    ///     serde::Raw,
1492    /// };
1493    /// let marked_unread_content = MarkedUnreadEventContent::new(true);
1494    /// let full_event: AnyRoomAccountDataEventContent =
1495    ///     marked_unread_content.clone().into();
1496    /// room.set_account_data_raw(
1497    ///     marked_unread_content.event_type(),
1498    ///     Raw::new(&full_event).unwrap(),
1499    /// )
1500    /// .await?;
1501    /// # anyhow::Ok(())
1502    /// # };
1503    /// ```
1504    pub async fn set_account_data_raw(
1505        &self,
1506        event_type: RoomAccountDataEventType,
1507        content: Raw<AnyRoomAccountDataEventContent>,
1508    ) -> Result<set_room_account_data::v3::Response> {
1509        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1510
1511        let request = set_room_account_data::v3::Request::new_raw(
1512            own_user.to_owned(),
1513            self.room_id().to_owned(),
1514            event_type,
1515            content,
1516        );
1517
1518        Ok(self.client.send(request).await?)
1519    }
1520
1521    /// Adds a tag to the room, or updates it if it already exists.
1522    ///
1523    /// Returns the [`create_tag::v3::Response`] from the server.
1524    ///
1525    /// # Arguments
1526    /// * `tag` - The tag to add or update.
1527    ///
1528    /// * `tag_info` - Information about the tag, generally containing the
1529    ///   `order` parameter.
1530    ///
1531    /// # Examples
1532    ///
1533    /// ```no_run
1534    /// # use std::str::FromStr;
1535    /// # use ruma::events::tag::{TagInfo, TagName, UserTagName};
1536    /// # async {
1537    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
1538    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
1539    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
1540    /// use matrix_sdk::ruma::events::tag::TagInfo;
1541    ///
1542    /// if let Some(room) = client.get_room(&room_id) {
1543    ///     let mut tag_info = TagInfo::new();
1544    ///     tag_info.order = Some(0.9);
1545    ///     let user_tag = UserTagName::from_str("u.work")?;
1546    ///
1547    ///     room.set_tag(TagName::User(user_tag), tag_info).await?;
1548    /// }
1549    /// # anyhow::Ok(()) };
1550    /// ```
1551    pub async fn set_tag(
1552        &self,
1553        tag: TagName,
1554        tag_info: TagInfo,
1555    ) -> Result<create_tag::v3::Response> {
1556        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1557        let request = create_tag::v3::Request::new(
1558            user_id.to_owned(),
1559            self.inner.room_id().to_owned(),
1560            tag.to_string(),
1561            tag_info,
1562        );
1563        Ok(self.client.send(request).await?)
1564    }
1565
1566    /// Removes a tag from the room.
1567    ///
1568    /// Returns the [`delete_tag::v3::Response`] from the server.
1569    ///
1570    /// # Arguments
1571    /// * `tag` - The tag to remove.
1572    pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1573        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1574        let request = delete_tag::v3::Request::new(
1575            user_id.to_owned(),
1576            self.inner.room_id().to_owned(),
1577            tag.to_string(),
1578        );
1579        Ok(self.client.send(request).await?)
1580    }
1581
1582    /// Add or remove the `m.favourite` flag for this room.
1583    ///
1584    /// If `is_favourite` is `true`, and the `m.low_priority` tag is set on the
1585    /// room, the tag will be removed too.
1586    ///
1587    /// # Arguments
1588    ///
1589    /// * `is_favourite` - Whether to mark this room as favourite.
1590    /// * `tag_order` - The order of the tag if any.
1591    pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1592        if is_favourite {
1593            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1594
1595            self.set_tag(TagName::Favorite, tag_info).await?;
1596
1597            if self.is_low_priority() {
1598                self.remove_tag(TagName::LowPriority).await?;
1599            }
1600        } else {
1601            self.remove_tag(TagName::Favorite).await?;
1602        }
1603        Ok(())
1604    }
1605
1606    /// Add or remove the `m.lowpriority` flag for this room.
1607    ///
1608    /// If `is_low_priority` is `true`, and the `m.favourite` tag is set on the
1609    /// room, the tag will be removed too.
1610    ///
1611    /// # Arguments
1612    ///
1613    /// * `is_low_priority` - Whether to mark this room as low_priority or not.
1614    /// * `tag_order` - The order of the tag if any.
1615    pub async fn set_is_low_priority(
1616        &self,
1617        is_low_priority: bool,
1618        tag_order: Option<f64>,
1619    ) -> Result<()> {
1620        if is_low_priority {
1621            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1622
1623            self.set_tag(TagName::LowPriority, tag_info).await?;
1624
1625            if self.is_favourite() {
1626                self.remove_tag(TagName::Favorite).await?;
1627            }
1628        } else {
1629            self.remove_tag(TagName::LowPriority).await?;
1630        }
1631        Ok(())
1632    }
1633
1634    /// Sets whether this room is a DM.
1635    ///
1636    /// When setting this room as DM, it will be marked as DM for all active
1637    /// members of the room. When unsetting this room as DM, it will be
1638    /// unmarked as DM for all users, not just the members.
1639    ///
1640    /// # Arguments
1641    /// * `is_direct` - Whether to mark this room as direct.
1642    pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1643        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1644
1645        let mut content = self
1646            .client
1647            .account()
1648            .account_data::<DirectEventContent>()
1649            .await?
1650            .map(|c| c.deserialize())
1651            .transpose()?
1652            .unwrap_or_default();
1653
1654        let this_room_id = self.inner.room_id();
1655
1656        if is_direct {
1657            let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1658            room_members.retain(|member| member.user_id() != self.own_user_id());
1659
1660            for member in room_members {
1661                let entry = content.entry(member.user_id().into()).or_default();
1662                if !entry.iter().any(|room_id| room_id == this_room_id) {
1663                    entry.push(this_room_id.to_owned());
1664                }
1665            }
1666        } else {
1667            for (_, list) in content.iter_mut() {
1668                list.retain(|room_id| *room_id != this_room_id);
1669            }
1670
1671            // Remove user ids that don't have any room marked as DM
1672            content.retain(|_, list| !list.is_empty());
1673        }
1674
1675        let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1676
1677        self.client.send(request).await?;
1678        Ok(())
1679    }
1680
1681    /// Tries to decrypt a room event.
1682    ///
1683    /// # Arguments
1684    /// * `event` - The room event to be decrypted.
1685    ///
1686    /// Returns the decrypted event. In the case of a decryption error, returns
1687    /// a `TimelineEvent` representing the decryption error.
1688    #[cfg(feature = "e2e-encryption")]
1689    #[cfg(not(feature = "experimental-encrypted-state-events"))]
1690    pub async fn decrypt_event(
1691        &self,
1692        event: &Raw<OriginalSyncRoomEncryptedEvent>,
1693        push_ctx: Option<&PushContext>,
1694    ) -> Result<TimelineEvent> {
1695        let machine = self.client.olm_machine().await;
1696        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1697
1698        match machine
1699            .try_decrypt_room_event(
1700                event.cast_ref(),
1701                self.inner.room_id(),
1702                self.client.decryption_settings(),
1703            )
1704            .await?
1705        {
1706            RoomEventDecryptionResult::Decrypted(decrypted) => {
1707                let push_actions = if let Some(push_ctx) = push_ctx {
1708                    Some(push_ctx.for_event(&decrypted.event).await)
1709                } else {
1710                    None
1711                };
1712                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1713            }
1714            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1715                self.client
1716                    .encryption()
1717                    .backups()
1718                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1719                Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1720            }
1721        }
1722    }
1723
1724    /// Tries to decrypt a room event.
1725    ///
1726    /// # Arguments
1727    /// * `event` - The room event to be decrypted.
1728    ///
1729    /// Returns the decrypted event. In the case of a decryption error, returns
1730    /// a `TimelineEvent` representing the decryption error.
1731    #[cfg(feature = "experimental-encrypted-state-events")]
1732    pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1733        &self,
1734        event: &Raw<T>,
1735        push_ctx: Option<&PushContext>,
1736    ) -> Result<TimelineEvent> {
1737        let machine = self.client.olm_machine().await;
1738        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1739
1740        match machine
1741            .try_decrypt_room_event(
1742                event.cast_ref(),
1743                self.inner.room_id(),
1744                self.client.decryption_settings(),
1745            )
1746            .await?
1747        {
1748            RoomEventDecryptionResult::Decrypted(decrypted) => {
1749                let push_actions = if let Some(push_ctx) = push_ctx {
1750                    Some(push_ctx.for_event(&decrypted.event).await)
1751                } else {
1752                    None
1753                };
1754                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1755            }
1756            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1757                self.client
1758                    .encryption()
1759                    .backups()
1760                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1761                // Cast safety: Anything that can be cast to EncryptedEvent must be a timeline
1762                // event.
1763                Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1764            }
1765        }
1766    }
1767
1768    /// Fetches the [`EncryptionInfo`] for an event decrypted with the supplied
1769    /// session_id.
1770    ///
1771    /// This may be used when we receive an update for a session, and we want to
1772    /// reflect the changes in messages we have received that were encrypted
1773    /// with that session, e.g. to remove a warning shield because a device is
1774    /// now verified.
1775    ///
1776    /// # Arguments
1777    /// * `session_id` - The ID of the Megolm session to get information for.
1778    /// * `sender` - The (claimed) sender of the event where the session was
1779    ///   used.
1780    #[cfg(feature = "e2e-encryption")]
1781    pub async fn get_encryption_info(
1782        &self,
1783        session_id: &str,
1784        sender: &UserId,
1785    ) -> Option<Arc<EncryptionInfo>> {
1786        let machine = self.client.olm_machine().await;
1787        let machine = machine.as_ref()?;
1788        machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1789    }
1790
1791    /// Forces the currently active room key, which is used to encrypt messages,
1792    /// to be rotated.
1793    ///
1794    /// A new room key will be crated and shared with all the room members the
1795    /// next time a message will be sent. You don't have to call this method,
1796    /// room keys will be rotated automatically when necessary. This method is
1797    /// still useful for debugging purposes.
1798    ///
1799    /// For more info please take a look a the [`encryption`] module
1800    /// documentation.
1801    ///
1802    /// [`encryption`]: crate::encryption
1803    #[cfg(feature = "e2e-encryption")]
1804    pub async fn discard_room_key(&self) -> Result<()> {
1805        let machine = self.client.olm_machine().await;
1806        if let Some(machine) = machine.as_ref() {
1807            machine.discard_room_key(self.inner.room_id()).await?;
1808            Ok(())
1809        } else {
1810            Err(Error::NoOlmMachine)
1811        }
1812    }
1813
1814    /// Ban the user with `UserId` from this room.
1815    ///
1816    /// # Arguments
1817    ///
1818    /// * `user_id` - The user to ban with `UserId`.
1819    ///
1820    /// * `reason` - The reason for banning this user.
1821    #[instrument(skip_all)]
1822    pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1823        let request = assign!(
1824            ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1825            { reason: reason.map(ToOwned::to_owned) }
1826        );
1827        self.client.send(request).await?;
1828        Ok(())
1829    }
1830
1831    /// Unban the user with `UserId` from this room.
1832    ///
1833    /// # Arguments
1834    ///
1835    /// * `user_id` - The user to unban with `UserId`.
1836    ///
1837    /// * `reason` - The reason for unbanning this user.
1838    #[instrument(skip_all)]
1839    pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1840        let request = assign!(
1841            unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1842            { reason: reason.map(ToOwned::to_owned) }
1843        );
1844        self.client.send(request).await?;
1845        Ok(())
1846    }
1847
1848    /// Kick a user out of this room.
1849    ///
1850    /// # Arguments
1851    ///
1852    /// * `user_id` - The `UserId` of the user that should be kicked out of the
1853    ///   room.
1854    ///
1855    /// * `reason` - Optional reason why the room member is being kicked out.
1856    #[instrument(skip_all)]
1857    pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1858        let request = assign!(
1859            kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1860            { reason: reason.map(ToOwned::to_owned) }
1861        );
1862        self.client.send(request).await?;
1863        Ok(())
1864    }
1865
1866    /// Invite the specified user by `UserId` to this room.
1867    ///
1868    /// # Arguments
1869    ///
1870    /// * `user_id` - The `UserId` of the user to invite to the room.
1871    #[instrument(skip_all)]
1872    pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1873        #[cfg(feature = "e2e-encryption")]
1874        if self.client.inner.enable_share_history_on_invite {
1875            shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1876        }
1877
1878        let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1879        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1880        self.client.send(request).await?;
1881
1882        // Force a future room members reload before sending any event to prevent UTDs
1883        // that can happen when some event is sent after a room member has been invited
1884        // but before the /sync request could fetch the membership change event.
1885        self.mark_members_missing();
1886
1887        Ok(())
1888    }
1889
1890    /// Invite the specified user by third party id to this room.
1891    ///
1892    /// # Arguments
1893    ///
1894    /// * `invite_id` - A third party id of a user to invite to the room.
1895    #[instrument(skip_all)]
1896    pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1897        let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1898        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1899        self.client.send(request).await?;
1900
1901        // Force a future room members reload before sending any event to prevent UTDs
1902        // that can happen when some event is sent after a room member has been invited
1903        // but before the /sync request could fetch the membership change event.
1904        self.mark_members_missing();
1905
1906        Ok(())
1907    }
1908
1909    /// Activate typing notice for this room.
1910    ///
1911    /// The typing notice remains active for 4s. It can be deactivate at any
1912    /// point by setting typing to `false`. If this method is called while
1913    /// the typing notice is active nothing will happen. This method can be
1914    /// called on every key stroke, since it will do nothing while typing is
1915    /// active.
1916    ///
1917    /// # Arguments
1918    ///
1919    /// * `typing` - Whether the user is typing or has stopped typing.
1920    ///
1921    /// # Examples
1922    ///
1923    /// ```no_run
1924    /// use std::time::Duration;
1925    ///
1926    /// use matrix_sdk::ruma::api::client::typing::create_typing_event::v3::Typing;
1927    /// # use matrix_sdk::{
1928    /// #     Client, config::SyncSettings,
1929    /// #     ruma::room_id,
1930    /// # };
1931    /// # use url::Url;
1932    ///
1933    /// # async {
1934    /// # let homeserver = Url::parse("http://localhost:8080")?;
1935    /// # let client = Client::new(homeserver).await?;
1936    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
1937    ///
1938    /// if let Some(room) = client.get_room(&room_id) {
1939    ///     room.typing_notice(true).await?
1940    /// }
1941    /// # anyhow::Ok(()) };
1942    /// ```
1943    pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1944        self.ensure_room_joined()?;
1945
1946        // Only send a request to the homeserver if the old timeout has elapsed
1947        // or the typing notice changed state within the `TYPING_NOTICE_TIMEOUT`
1948        let send = if let Some(typing_time) =
1949            self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1950        {
1951            if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1952                // We always reactivate the typing notice if typing is true or
1953                // we may need to deactivate it if it's
1954                // currently active if typing is false
1955                typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1956            } else {
1957                // Only send a request when we need to deactivate typing
1958                !typing
1959            }
1960        } else {
1961            // Typing notice is currently deactivated, therefore, send a request
1962            // only when it's about to be activated
1963            typing
1964        };
1965
1966        if send {
1967            self.send_typing_notice(typing).await?;
1968        }
1969
1970        Ok(())
1971    }
1972
1973    #[instrument(name = "typing_notice", skip(self))]
1974    async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1975        let typing = if typing {
1976            self.client
1977                .inner
1978                .typing_notice_times
1979                .write()
1980                .unwrap()
1981                .insert(self.room_id().to_owned(), Instant::now());
1982            Typing::Yes(TYPING_NOTICE_TIMEOUT)
1983        } else {
1984            self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1985            Typing::No
1986        };
1987
1988        let request = create_typing_event::v3::Request::new(
1989            self.own_user_id().to_owned(),
1990            self.room_id().to_owned(),
1991            typing,
1992        );
1993
1994        self.client.send(request).await?;
1995
1996        Ok(())
1997    }
1998
1999    /// Send a request to set a single receipt.
2000    ///
2001    /// If an unthreaded receipt is sent, this will also unset the unread flag
2002    /// of the room if necessary.
2003    ///
2004    /// # Arguments
2005    ///
2006    /// * `receipt_type` - The type of the receipt to set. Note that it is
2007    ///   possible to set the fully-read marker although it is technically not a
2008    ///   receipt.
2009    ///
2010    /// * `thread` - The thread where this receipt should apply, if any. Note
2011    ///   that this must be [`ReceiptThread::Unthreaded`] when sending a
2012    ///   [`ReceiptType::FullyRead`][create_receipt::v3::ReceiptType::FullyRead].
2013    ///
2014    /// * `event_id` - The `EventId` of the event to set the receipt on.
2015    #[instrument(skip_all)]
2016    pub async fn send_single_receipt(
2017        &self,
2018        receipt_type: create_receipt::v3::ReceiptType,
2019        thread: ReceiptThread,
2020        event_id: OwnedEventId,
2021    ) -> Result<()> {
2022        // Since the receipt type and the thread aren't Hash/Ord, flatten then as a
2023        // string key.
2024        let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
2025
2026        self.client
2027            .inner
2028            .locks
2029            .read_receipt_deduplicated_handler
2030            .run((request_key, event_id.clone()), async {
2031                // We will unset the unread flag if we send an unthreaded receipt.
2032                let is_unthreaded = thread == ReceiptThread::Unthreaded;
2033
2034                let mut request = create_receipt::v3::Request::new(
2035                    self.room_id().to_owned(),
2036                    receipt_type,
2037                    event_id,
2038                );
2039                request.thread = thread;
2040
2041                self.client.send(request).await?;
2042
2043                if is_unthreaded {
2044                    self.set_unread_flag(false).await?;
2045                }
2046
2047                Ok(())
2048            })
2049            .await
2050    }
2051
2052    /// Send a request to set multiple receipts at once.
2053    ///
2054    /// This will also unset the unread flag of the room if necessary.
2055    ///
2056    /// # Arguments
2057    ///
2058    /// * `receipts` - The `Receipts` to send.
2059    ///
2060    /// If `receipts` is empty, this is a no-op.
2061    #[instrument(skip_all)]
2062    pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2063        if receipts.is_empty() {
2064            return Ok(());
2065        }
2066
2067        let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2068        let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2069            fully_read,
2070            read_receipt: public_read_receipt,
2071            private_read_receipt,
2072        });
2073
2074        self.client.send(request).await?;
2075
2076        self.set_unread_flag(false).await?;
2077
2078        Ok(())
2079    }
2080
2081    /// Helper function to enable End-to-end encryption in this room.
2082    /// `encrypted_state_events` is not used unless the
2083    /// `experimental-encrypted-state-events` feature is enabled.
2084    #[allow(unused_variables, unused_mut)]
2085    async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2086        use ruma::{
2087            EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2088        };
2089        const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2090
2091        if !self.latest_encryption_state().await?.is_encrypted() {
2092            let mut content =
2093                RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2094            #[cfg(feature = "experimental-encrypted-state-events")]
2095            if encrypted_state_events {
2096                content = content.with_encrypted_state();
2097            }
2098            self.send_state_event(content).await?;
2099
2100            // Spin on the sync beat event, since the first sync we receive might not
2101            // include the encryption event.
2102            //
2103            // TODO do we want to return an error here if we time out? This
2104            // could be quite useful if someone wants to enable encryption and
2105            // send a message right after it's enabled.
2106            let res = timeout(
2107                async {
2108                    loop {
2109                        // Listen for sync events, then check if the encryption state is known.
2110                        self.client.inner.sync_beat.listen().await;
2111                        let _state_store_lock =
2112                            self.client.base_client().state_store_lock().lock().await;
2113
2114                        if !self.inner.encryption_state().is_unknown() {
2115                            break;
2116                        }
2117                    }
2118                },
2119                SYNC_WAIT_TIME,
2120            )
2121            .await;
2122
2123            let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2124
2125            // If encryption was enabled, return.
2126            #[cfg(not(feature = "experimental-encrypted-state-events"))]
2127            if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2128                debug!("room successfully marked as encrypted");
2129                return Ok(());
2130            }
2131
2132            // If encryption with state event encryption was enabled, return.
2133            #[cfg(feature = "experimental-encrypted-state-events")]
2134            if res.is_ok() && {
2135                if encrypted_state_events {
2136                    self.inner.encryption_state().is_state_encrypted()
2137                } else {
2138                    self.inner.encryption_state().is_encrypted()
2139                }
2140            } {
2141                debug!("room successfully marked as encrypted");
2142                return Ok(());
2143            }
2144
2145            // If after waiting for multiple syncs, we don't have the encryption state we
2146            // expect, assume the local encryption state is incorrect; this will
2147            // cause the SDK to re-request it later for confirmation, instead of
2148            // assuming it's sync'd and correct (and not encrypted).
2149            debug!("still not marked as encrypted, marking encryption state as missing");
2150
2151            let mut room_info = self.clone_info();
2152            room_info.mark_encryption_state_missing();
2153            let mut changes = StateChanges::default();
2154            changes.add_room(room_info.clone());
2155
2156            self.client.state_store().save_changes(&changes).await?;
2157            self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2158        }
2159
2160        Ok(())
2161    }
2162
2163    /// Enable End-to-end encryption in this room.
2164    ///
2165    /// This method will be a noop if encryption is already enabled, otherwise
2166    /// sends a `m.room.encryption` state event to the room. This might fail if
2167    /// you don't have the appropriate power level to enable end-to-end
2168    /// encryption.
2169    ///
2170    /// A sync needs to be received to update the local room state. This method
2171    /// will wait for a sync to be received, this might time out if no
2172    /// sync loop is running or if the server is slow.
2173    ///
2174    /// # Examples
2175    ///
2176    /// ```no_run
2177    /// # use matrix_sdk::{
2178    /// #     Client, config::SyncSettings,
2179    /// #     ruma::room_id,
2180    /// # };
2181    /// # use url::Url;
2182    /// #
2183    /// # async {
2184    /// # let homeserver = Url::parse("http://localhost:8080")?;
2185    /// # let client = Client::new(homeserver).await?;
2186    /// # let room_id = room_id!("!test:localhost");
2187    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2188    ///
2189    /// if let Some(room) = client.get_room(&room_id) {
2190    ///     room.enable_encryption().await?
2191    /// }
2192    /// # anyhow::Ok(()) };
2193    /// ```
2194    #[instrument(skip_all)]
2195    pub async fn enable_encryption(&self) -> Result<()> {
2196        self.enable_encryption_inner(false).await
2197    }
2198
2199    /// Enable End-to-end encryption in this room, opting into experimental
2200    /// state event encryption.
2201    ///
2202    /// This method will be a noop if encryption is already enabled, otherwise
2203    /// sends a `m.room.encryption` state event to the room. This might fail if
2204    /// you don't have the appropriate power level to enable end-to-end
2205    /// encryption.
2206    ///
2207    /// A sync needs to be received to update the local room state. This method
2208    /// will wait for a sync to be received, this might time out if no
2209    /// sync loop is running or if the server is slow.
2210    ///
2211    /// # Examples
2212    ///
2213    /// ```no_run
2214    /// # use matrix_sdk::{
2215    /// #     Client, config::SyncSettings,
2216    /// #     ruma::room_id,
2217    /// # };
2218    /// # use url::Url;
2219    /// #
2220    /// # async {
2221    /// # let homeserver = Url::parse("http://localhost:8080")?;
2222    /// # let client = Client::new(homeserver).await?;
2223    /// # let room_id = room_id!("!test:localhost");
2224    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2225    ///
2226    /// if let Some(room) = client.get_room(&room_id) {
2227    ///     room.enable_encryption_with_state_event_encryption().await?
2228    /// }
2229    /// # anyhow::Ok(()) };
2230    /// ```
2231    #[instrument(skip_all)]
2232    #[cfg(feature = "experimental-encrypted-state-events")]
2233    pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2234        self.enable_encryption_inner(true).await
2235    }
2236
2237    /// Share a room key with users in the given room.
2238    ///
2239    /// This will create Olm sessions with all the users/device pairs in the
2240    /// room if necessary and share a room key that can be shared with them.
2241    ///
2242    /// Does nothing if no room key needs to be shared.
2243    // TODO: expose this publicly so people can pre-share a group session if
2244    // e.g. a user starts to type a message for a room.
2245    #[cfg(feature = "e2e-encryption")]
2246    #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2247    async fn preshare_room_key(&self) -> Result<()> {
2248        self.ensure_room_joined()?;
2249
2250        // Take and release the lock on the store, if needs be.
2251        let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2252        tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2253
2254        self.client
2255            .locks()
2256            .group_session_deduplicated_handler
2257            .run(self.room_id().to_owned(), async move {
2258                {
2259                    let members = self
2260                        .client
2261                        .state_store()
2262                        .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2263                        .await?;
2264                    self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2265                };
2266
2267                let response = self.share_room_key().await;
2268
2269                // If one of the responses failed invalidate the group
2270                // session as using it would end up in undecryptable
2271                // messages.
2272                if let Err(r) = response {
2273                    let machine = self.client.olm_machine().await;
2274                    if let Some(machine) = machine.as_ref() {
2275                        machine.discard_room_key(self.room_id()).await?;
2276                    }
2277                    return Err(r);
2278                }
2279
2280                Ok(())
2281            })
2282            .await
2283    }
2284
2285    /// Share a group session for a room.
2286    ///
2287    /// # Panics
2288    ///
2289    /// Panics if the client isn't logged in.
2290    #[cfg(feature = "e2e-encryption")]
2291    #[instrument(skip_all)]
2292    async fn share_room_key(&self) -> Result<()> {
2293        self.ensure_room_joined()?;
2294
2295        let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2296
2297        for request in requests {
2298            let response = self.client.send_to_device(&request).await?;
2299            self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2300        }
2301
2302        Ok(())
2303    }
2304
2305    /// Wait for the room to be fully synced.
2306    ///
2307    /// This method makes sure the room that was returned when joining a room
2308    /// has been echoed back in the sync.
2309    ///
2310    /// Warning: This waits until a sync happens and does not return if no sync
2311    /// is happening. It can also return early when the room is not a joined
2312    /// room anymore.
2313    #[instrument(skip_all)]
2314    pub async fn sync_up(&self) {
2315        while !self.is_synced() && self.state() == RoomState::Joined {
2316            let wait_for_beat = self.client.inner.sync_beat.listen();
2317            // We don't care whether it's a timeout or a sync beat.
2318            let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2319        }
2320    }
2321
2322    /// Send a message-like event to this room.
2323    ///
2324    /// Returns the parsed response from the server.
2325    ///
2326    /// If the encryption feature is enabled this method will transparently
2327    /// encrypt the event if this room is encrypted (except for `m.reaction`
2328    /// events, which are never encrypted).
2329    ///
2330    /// **Note**: If you just want to send an event with custom JSON content to
2331    /// a room, you can use the [`send_raw()`][Self::send_raw] method for that.
2332    ///
2333    /// If you want to set a transaction ID for the event, use
2334    /// [`.with_transaction_id()`][SendMessageLikeEvent::with_transaction_id]
2335    /// on the returned value before `.await`ing it.
2336    ///
2337    /// # Arguments
2338    ///
2339    /// * `content` - The content of the message event.
2340    ///
2341    /// # Examples
2342    ///
2343    /// ```no_run
2344    /// # use std::sync::{Arc, RwLock};
2345    /// # use matrix_sdk::{Client, config::SyncSettings};
2346    /// # use url::Url;
2347    /// # use matrix_sdk::ruma::room_id;
2348    /// # use serde::{Deserialize, Serialize};
2349    /// use matrix_sdk::ruma::{
2350    ///     MilliSecondsSinceUnixEpoch, TransactionId,
2351    ///     events::{
2352    ///         macros::EventContent,
2353    ///         room::message::{RoomMessageEventContent, TextMessageEventContent},
2354    ///     },
2355    ///     uint,
2356    /// };
2357    ///
2358    /// # async {
2359    /// # let homeserver = Url::parse("http://localhost:8080")?;
2360    /// # let mut client = Client::new(homeserver).await?;
2361    /// # let room_id = room_id!("!test:localhost");
2362    /// let content = RoomMessageEventContent::text_plain("Hello world");
2363    /// let txn_id = TransactionId::new();
2364    ///
2365    /// if let Some(room) = client.get_room(&room_id) {
2366    ///     room.send(content).with_transaction_id(txn_id).await?;
2367    /// }
2368    ///
2369    /// // Custom events work too:
2370    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2371    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
2372    /// struct TokenEventContent {
2373    ///     token: String,
2374    ///     #[serde(rename = "exp")]
2375    ///     expires_at: MilliSecondsSinceUnixEpoch,
2376    /// }
2377    ///
2378    /// # fn generate_token() -> String { todo!() }
2379    /// let content = TokenEventContent {
2380    ///     token: generate_token(),
2381    ///     expires_at: {
2382    ///         let now = MilliSecondsSinceUnixEpoch::now();
2383    ///         MilliSecondsSinceUnixEpoch(now.0 + uint!(30_000))
2384    ///     },
2385    /// };
2386    ///
2387    /// if let Some(room) = client.get_room(&room_id) {
2388    ///     room.send(content).await?;
2389    /// }
2390    /// # anyhow::Ok(()) };
2391    /// ```
2392    pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2393        SendMessageLikeEvent::new(self, content)
2394    }
2395
2396    /// Run /keys/query requests for all the non-tracked users, and for users
2397    /// with an out-of-date device list.
2398    #[cfg(feature = "e2e-encryption")]
2399    async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2400        let olm = self.client.olm_machine().await;
2401        let olm = olm.as_ref().expect("Olm machine wasn't started");
2402
2403        let members =
2404            self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2405
2406        let tracked: HashMap<_, _> = olm
2407            .store()
2408            .load_tracked_users()
2409            .await?
2410            .into_iter()
2411            .map(|tracked| (tracked.user_id, tracked.dirty))
2412            .collect();
2413
2414        // A member has no unknown devices iff it was tracked *and* the tracking is
2415        // not considered dirty.
2416        let members_with_unknown_devices =
2417            members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2418
2419        let (req_id, request) =
2420            olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2421
2422        if !request.device_keys.is_empty() {
2423            self.client.keys_query(&req_id, request.device_keys).await?;
2424        }
2425
2426        Ok(())
2427    }
2428
2429    /// Send a message-like event with custom JSON content to this room.
2430    ///
2431    /// Returns the parsed response from the server.
2432    ///
2433    /// If the encryption feature is enabled this method will transparently
2434    /// encrypt the event if this room is encrypted (except for `m.reaction`
2435    /// events, which are never encrypted).
2436    ///
2437    /// This method is equivalent to the [`send()`][Self::send] method but
2438    /// allows sending custom JSON payloads, e.g. constructed using the
2439    /// [`serde_json::json!()`] macro.
2440    ///
2441    /// If you want to set a transaction ID for the event, use
2442    /// [`.with_transaction_id()`][SendRawMessageLikeEvent::with_transaction_id]
2443    /// on the returned value before `.await`ing it.
2444    ///
2445    /// # Arguments
2446    ///
2447    /// * `event_type` - The type of the event.
2448    ///
2449    /// * `content` - The content of the event as a raw JSON value. The argument
2450    ///   type can be `serde_json::Value`, but also other raw JSON types; for
2451    ///   the full list check the documentation of
2452    ///   [`IntoRawMessageLikeEventContent`].
2453    ///
2454    /// # Examples
2455    ///
2456    /// ```no_run
2457    /// # use std::sync::{Arc, RwLock};
2458    /// # use matrix_sdk::{Client, config::SyncSettings};
2459    /// # use url::Url;
2460    /// # use matrix_sdk::ruma::room_id;
2461    /// # async {
2462    /// # let homeserver = Url::parse("http://localhost:8080")?;
2463    /// # let mut client = Client::new(homeserver).await?;
2464    /// # let room_id = room_id!("!test:localhost");
2465    /// use serde_json::json;
2466    ///
2467    /// if let Some(room) = client.get_room(&room_id) {
2468    ///     room.send_raw("m.room.message", json!({ "body": "Hello world" })).await?;
2469    /// }
2470    /// # anyhow::Ok(()) };
2471    /// ```
2472    #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2473    pub fn send_raw<'a>(
2474        &'a self,
2475        event_type: &'a str,
2476        content: impl IntoRawMessageLikeEventContent,
2477    ) -> SendRawMessageLikeEvent<'a> {
2478        // Note: the recorded instrument fields are saved in
2479        // `SendRawMessageLikeEvent::into_future`.
2480        SendRawMessageLikeEvent::new(self, event_type, content)
2481    }
2482
2483    /// Send an attachment to this room.
2484    ///
2485    /// This will upload the given data that the reader produces using the
2486    /// [`upload()`] method and post an event to the given room.
2487    /// If the room is encrypted and the encryption feature is enabled the
2488    /// upload will be encrypted.
2489    ///
2490    /// This is a convenience method that calls the
2491    /// [`upload()`] and afterwards the [`send()`].
2492    ///
2493    /// # Arguments
2494    /// * `filename` - The file name.
2495    ///
2496    /// * `content_type` - The type of the media, this will be used as the
2497    /// content-type header.
2498    ///
2499    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2500    /// media.
2501    ///
2502    /// * `config` - Metadata and configuration for the attachment.
2503    ///
2504    /// # Examples
2505    ///
2506    /// ```no_run
2507    /// # use std::fs;
2508    /// # use matrix_sdk::{Client, ruma::room_id, attachment::AttachmentConfig};
2509    /// # use url::Url;
2510    /// # use mime;
2511    /// # async {
2512    /// # let homeserver = Url::parse("http://localhost:8080")?;
2513    /// # let mut client = Client::new(homeserver).await?;
2514    /// # let room_id = room_id!("!test:localhost");
2515    /// let mut image = fs::read("/home/example/my-cat.jpg")?;
2516    ///
2517    /// if let Some(room) = client.get_room(&room_id) {
2518    ///     room.send_attachment(
2519    ///         "my_favorite_cat.jpg",
2520    ///         &mime::IMAGE_JPEG,
2521    ///         image,
2522    ///         AttachmentConfig::new(),
2523    ///     ).await?;
2524    /// }
2525    /// # anyhow::Ok(()) };
2526    /// ```
2527    ///
2528    /// [`upload()`]: crate::Media::upload
2529    /// [`send()`]: Self::send
2530    #[instrument(skip_all)]
2531    pub fn send_attachment<'a>(
2532        &'a self,
2533        filename: impl Into<String>,
2534        content_type: &'a Mime,
2535        data: Vec<u8>,
2536        config: AttachmentConfig,
2537    ) -> SendAttachment<'a> {
2538        SendAttachment::new(self, filename.into(), content_type, data, config)
2539    }
2540
2541    /// Prepare and send an attachment to this room.
2542    ///
2543    /// This will upload the given data that the reader produces using the
2544    /// [`upload()`](#method.upload) method and post an event to the given room.
2545    /// If the room is encrypted and the encryption feature is enabled the
2546    /// upload will be encrypted.
2547    ///
2548    /// This is a convenience method that calls the
2549    /// [`Client::upload()`](#Client::method.upload) and afterwards the
2550    /// [`send()`](#method.send).
2551    ///
2552    /// # Arguments
2553    /// * `filename` - The file name.
2554    ///
2555    /// * `content_type` - The type of the media, this will be used as the
2556    ///   content-type header.
2557    ///
2558    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2559    ///   media.
2560    ///
2561    /// * `config` - Metadata and configuration for the attachment.
2562    ///
2563    /// * `send_progress` - An observable to transmit forward progress about the
2564    ///   upload.
2565    ///
2566    /// * `store_in_cache` - A boolean defining whether the uploaded media will
2567    ///   be stored in the cache immediately after a successful upload.
2568    #[instrument(skip_all)]
2569    pub(super) async fn prepare_and_send_attachment<'a>(
2570        &'a self,
2571        filename: String,
2572        content_type: &'a Mime,
2573        data: Vec<u8>,
2574        mut config: AttachmentConfig,
2575        send_progress: SharedObservable<TransmissionProgress>,
2576        store_in_cache: bool,
2577    ) -> Result<send_message_event::v3::Response> {
2578        self.ensure_room_joined()?;
2579
2580        let txn_id = config.txn_id.take();
2581        let mentions = config.mentions.take();
2582
2583        let thumbnail = config.thumbnail.take();
2584
2585        // If necessary, store caching data for the thumbnail ahead of time.
2586        let thumbnail_cache_info = if store_in_cache {
2587            thumbnail
2588                .as_ref()
2589                .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2590        } else {
2591            None
2592        };
2593
2594        #[cfg(feature = "e2e-encryption")]
2595        let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2596            self.client
2597                .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2598                .await?
2599        } else {
2600            self.client
2601                .media()
2602                .upload_plain_media_and_thumbnail(
2603                    content_type,
2604                    // TODO: get rid of this clone; wait for Ruma to use `Bytes` or something
2605                    // similar.
2606                    data.clone(),
2607                    thumbnail,
2608                    send_progress,
2609                )
2610                .await?
2611        };
2612
2613        #[cfg(not(feature = "e2e-encryption"))]
2614        let (media_source, thumbnail) = self
2615            .client
2616            .media()
2617            .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2618            .await?;
2619
2620        if store_in_cache {
2621            let media_store_lock_guard = self.client.media_store().lock().await?;
2622
2623            // A failure to cache shouldn't prevent the whole upload from finishing
2624            // properly, so only log errors during caching.
2625
2626            debug!("caching the media");
2627            let request =
2628                MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2629
2630            if let Err(err) = media_store_lock_guard
2631                .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2632                .await
2633            {
2634                warn!("unable to cache the media after uploading it: {err}");
2635            }
2636
2637            if let Some(((data, height, width), source)) =
2638                thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2639            {
2640                debug!("caching the thumbnail");
2641
2642                let request = MediaRequestParameters {
2643                    source: source.clone(),
2644                    format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2645                };
2646
2647                if let Err(err) = media_store_lock_guard
2648                    .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2649                    .await
2650                {
2651                    warn!("unable to cache the media after uploading it: {err}");
2652                }
2653            }
2654        }
2655
2656        let content = self
2657            .make_media_event(
2658                Room::make_attachment_type(
2659                    content_type,
2660                    filename,
2661                    media_source,
2662                    config.caption,
2663                    config.info,
2664                    thumbnail,
2665                ),
2666                mentions,
2667                config.reply,
2668            )
2669            .await?;
2670
2671        let mut fut = self.send(content);
2672        if let Some(txn_id) = txn_id {
2673            fut = fut.with_transaction_id(txn_id);
2674        }
2675
2676        fut.await.map(|result| result.response)
2677    }
2678
2679    /// Creates the inner [`MessageType`] for an already-uploaded media file
2680    /// provided by its source.
2681    #[allow(clippy::too_many_arguments)]
2682    pub(crate) fn make_attachment_type(
2683        content_type: &Mime,
2684        filename: String,
2685        source: MediaSource,
2686        caption: Option<TextMessageEventContent>,
2687        info: Option<AttachmentInfo>,
2688        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2689    ) -> MessageType {
2690        make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2691    }
2692
2693    /// Creates the [`RoomMessageEventContent`] based on the message type,
2694    /// mentions and reply information.
2695    pub(crate) async fn make_media_event(
2696        &self,
2697        msg_type: MessageType,
2698        mentions: Option<Mentions>,
2699        reply: Option<Reply>,
2700    ) -> Result<RoomMessageEventContent> {
2701        let mut content = RoomMessageEventContent::new(msg_type);
2702        if let Some(mentions) = mentions {
2703            content = content.add_mentions(mentions);
2704        }
2705        if let Some(reply) = reply {
2706            // Since we just created the event, there is no relation attached to it. Thus,
2707            // it is safe to add the reply relation without overriding anything.
2708            content = self.make_reply_event(content.into(), reply).await?;
2709        }
2710        Ok(content)
2711    }
2712
2713    /// Creates the inner [`GalleryItemType`] for an already-uploaded media file
2714    /// provided by its source.
2715    #[cfg(feature = "unstable-msc4274")]
2716    #[allow(clippy::too_many_arguments)]
2717    pub(crate) fn make_gallery_item_type(
2718        content_type: &Mime,
2719        filename: String,
2720        source: MediaSource,
2721        caption: Option<TextMessageEventContent>,
2722        info: Option<AttachmentInfo>,
2723        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2724    ) -> GalleryItemType {
2725        make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2726    }
2727
2728    /// Update the power levels of a select set of users of this room.
2729    ///
2730    /// Issue a `power_levels` state event request to the server, changing the
2731    /// given UserId -> Int levels. May fail if the `power_levels` aren't
2732    /// locally known yet or the server rejects the state event update, e.g.
2733    /// because of insufficient permissions. Neither permissions to update
2734    /// nor whether the data might be stale is checked prior to issuing the
2735    /// request.
2736    pub async fn update_power_levels(
2737        &self,
2738        updates: Vec<(&UserId, Int)>,
2739    ) -> Result<send_state_event::v3::Response> {
2740        let mut power_levels = self.power_levels().await?;
2741
2742        for (user_id, new_level) in updates {
2743            if new_level == power_levels.users_default {
2744                power_levels.users.remove(user_id);
2745            } else {
2746                power_levels.users.insert(user_id.to_owned(), new_level);
2747            }
2748        }
2749
2750        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2751    }
2752
2753    /// Applies a set of power level changes to this room.
2754    ///
2755    /// Any values that are `None` in the given `RoomPowerLevelChanges` will
2756    /// remain unchanged.
2757    pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2758        let mut power_levels = self.power_levels().await?;
2759        power_levels.apply(changes)?;
2760        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2761        Ok(())
2762    }
2763
2764    /// Resets the room's power levels to the default values
2765    ///
2766    /// [spec]: https://spec.matrix.org/v1.9/client-server-api/#mroompower_levels
2767    pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2768        let creators = self.creators().unwrap_or_default();
2769        let rules = self.clone_info().room_version_rules_or_default();
2770
2771        let default_power_levels =
2772            RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2773        let changes = RoomPowerLevelChanges::from(default_power_levels);
2774        self.apply_power_level_changes(changes).await?;
2775        Ok(self.power_levels().await?)
2776    }
2777
2778    /// Gets the suggested role for the user with the provided `user_id`.
2779    ///
2780    /// This method checks the `RoomPowerLevels` events instead of loading the
2781    /// member list and looking for the member.
2782    pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2783        let power_level = self.get_user_power_level(user_id).await?;
2784        Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2785    }
2786
2787    /// Gets the power level the user with the provided `user_id`.
2788    ///
2789    /// This method checks the `RoomPowerLevels` events instead of loading the
2790    /// member list and looking for the member.
2791    pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2792        let event = self.power_levels().await?;
2793        Ok(event.for_user(user_id))
2794    }
2795
2796    /// Gets a map with the `UserId` of users with power levels other than `0`
2797    /// and this power level.
2798    pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2799        let power_levels = self.power_levels().await.ok();
2800        let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2801        if let Some(power_levels) = power_levels {
2802            for (id, level) in power_levels.users.into_iter() {
2803                user_power_levels.insert(id, level.into());
2804            }
2805        }
2806        user_power_levels
2807    }
2808
2809    /// Sets the name of this room.
2810    pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2811        self.send_state_event(RoomNameEventContent::new(name)).await
2812    }
2813
2814    /// Sets a new topic for this room.
2815    pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2816        self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2817    }
2818
2819    /// Sets the new avatar url for this room.
2820    ///
2821    /// # Arguments
2822    /// * `avatar_url` - The owned Matrix uri that represents the avatar
2823    /// * `info` - The optional image info that can be provided for the avatar
2824    pub async fn set_avatar_url(
2825        &self,
2826        url: &MxcUri,
2827        info: Option<avatar::ImageInfo>,
2828    ) -> Result<send_state_event::v3::Response> {
2829        self.ensure_room_joined()?;
2830
2831        let mut room_avatar_event = RoomAvatarEventContent::new();
2832        room_avatar_event.url = Some(url.to_owned());
2833        room_avatar_event.info = info.map(Box::new);
2834
2835        self.send_state_event(room_avatar_event).await
2836    }
2837
2838    /// Removes the avatar from the room
2839    pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2840        self.send_state_event(RoomAvatarEventContent::new()).await
2841    }
2842
2843    /// Uploads a new avatar for this room.
2844    ///
2845    /// # Arguments
2846    /// * `mime` - The mime type describing the data
2847    /// * `data` - The data representation of the avatar
2848    /// * `info` - The optional image info provided for the avatar, the blurhash
2849    ///   and the mimetype will always be updated
2850    pub async fn upload_avatar(
2851        &self,
2852        mime: &Mime,
2853        data: Vec<u8>,
2854        info: Option<avatar::ImageInfo>,
2855    ) -> Result<send_state_event::v3::Response> {
2856        self.ensure_room_joined()?;
2857
2858        let upload_response = self.client.media().upload(mime, data, None).await?;
2859        let mut info = info.unwrap_or_default();
2860        info.blurhash = upload_response.blurhash;
2861        info.mimetype = Some(mime.to_string());
2862
2863        self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2864    }
2865
2866    /// Send a state event with an empty state key to the homeserver.
2867    ///
2868    /// For state events with a non-empty state key, see
2869    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
2870    ///
2871    /// Returns the parsed response from the server.
2872    ///
2873    /// # Arguments
2874    ///
2875    /// * `content` - The content of the state event.
2876    ///
2877    /// # Examples
2878    ///
2879    /// ```no_run
2880    /// # use serde::{Deserialize, Serialize};
2881    /// # async {
2882    /// # let joined_room: matrix_sdk::Room = todo!();
2883    /// use matrix_sdk::ruma::{
2884    ///     EventEncryptionAlgorithm,
2885    ///     events::{
2886    ///         EmptyStateKey, macros::EventContent,
2887    ///         room::encryption::RoomEncryptionEventContent,
2888    ///     },
2889    /// };
2890    ///
2891    /// let encryption_event_content = RoomEncryptionEventContent::new(
2892    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
2893    /// );
2894    /// joined_room.send_state_event(encryption_event_content).await?;
2895    ///
2896    /// // Custom event:
2897    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2898    /// #[ruma_event(
2899    ///     type = "org.matrix.msc_9000.xxx",
2900    ///     kind = State,
2901    ///     state_key_type = EmptyStateKey,
2902    /// )]
2903    /// struct XxxStateEventContent {/* fields... */}
2904    ///
2905    /// let content: XxxStateEventContent = todo!();
2906    /// joined_room.send_state_event(content).await?;
2907    /// # anyhow::Ok(()) };
2908    /// ```
2909    #[cfg(not(feature = "experimental-encrypted-state-events"))]
2910    #[instrument(skip_all)]
2911    pub async fn send_state_event(
2912        &self,
2913        content: impl StateEventContent<StateKey = EmptyStateKey>,
2914    ) -> Result<send_state_event::v3::Response> {
2915        self.send_state_event_for_key(&EmptyStateKey, content).await
2916    }
2917
2918    /// Send a state event with an empty state key to the homeserver.
2919    ///
2920    /// For state events with a non-empty state key, see
2921    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
2922    ///
2923    /// If the experimental state event encryption feature is enabled, this
2924    /// method will transparently encrypt the event if this room is
2925    /// encrypted (except if the event type is considered critical for the room
2926    /// to function, as outlined in [MSC4362][msc4362]).
2927    ///
2928    /// Returns the parsed response from the server.
2929    ///
2930    /// # Arguments
2931    ///
2932    /// * `content` - The content of the state event.
2933    ///
2934    /// # Examples
2935    ///
2936    /// ```no_run
2937    /// # use serde::{Deserialize, Serialize};
2938    /// # async {
2939    /// # let joined_room: matrix_sdk::Room = todo!();
2940    /// use matrix_sdk::ruma::{
2941    ///     EventEncryptionAlgorithm,
2942    ///     events::{
2943    ///         EmptyStateKey, macros::EventContent,
2944    ///         room::encryption::RoomEncryptionEventContent,
2945    ///     },
2946    /// };
2947    ///
2948    /// let encryption_event_content = RoomEncryptionEventContent::new(
2949    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
2950    /// );
2951    /// joined_room.send_state_event(encryption_event_content).await?;
2952    ///
2953    /// // Custom event:
2954    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2955    /// #[ruma_event(
2956    ///     type = "org.matrix.msc_9000.xxx",
2957    ///     kind = State,
2958    ///     state_key_type = EmptyStateKey,
2959    /// )]
2960    /// struct XxxStateEventContent {/* fields... */}
2961    ///
2962    /// let content: XxxStateEventContent = todo!();
2963    /// joined_room.send_state_event(content).await?;
2964    /// # anyhow::Ok(()) };
2965    /// ```
2966    ///
2967    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/4362-encrypted-state.md
2968    #[cfg(feature = "experimental-encrypted-state-events")]
2969    #[instrument(skip_all)]
2970    pub fn send_state_event<'a>(
2971        &'a self,
2972        content: impl StateEventContent<StateKey = EmptyStateKey>,
2973    ) -> SendStateEvent<'a> {
2974        self.send_state_event_for_key(&EmptyStateKey, content)
2975    }
2976
2977    /// Send a state event to the homeserver.
2978    ///
2979    /// Returns the parsed response from the server.
2980    ///
2981    /// # Arguments
2982    ///
2983    /// * `content` - The content of the state event.
2984    ///
2985    /// * `state_key` - A unique key which defines the overwriting semantics for
2986    ///   this piece of room state.
2987    ///
2988    /// # Examples
2989    ///
2990    /// ```no_run
2991    /// # use serde::{Deserialize, Serialize};
2992    /// # async {
2993    /// # let joined_room: matrix_sdk::Room = todo!();
2994    /// use matrix_sdk::ruma::{
2995    ///     events::{
2996    ///         macros::EventContent,
2997    ///         room::member::{RoomMemberEventContent, MembershipState},
2998    ///     },
2999    ///     mxc_uri,
3000    /// };
3001    ///
3002    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3003    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3004    /// content.avatar_url = Some(avatar_url);
3005    ///
3006    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3007    ///
3008    /// // Custom event:
3009    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3010    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3011    /// struct XxxStateEventContent { /* fields... */ }
3012    ///
3013    /// let content: XxxStateEventContent = todo!();
3014    /// joined_room.send_state_event_for_key("foo", content).await?;
3015    /// # anyhow::Ok(()) };
3016    /// ```
3017    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3018    pub async fn send_state_event_for_key<C, K>(
3019        &self,
3020        state_key: &K,
3021        content: C,
3022    ) -> Result<send_state_event::v3::Response>
3023    where
3024        C: StateEventContent,
3025        C::StateKey: Borrow<K>,
3026        K: AsRef<str> + ?Sized,
3027    {
3028        self.ensure_room_joined()?;
3029        let request =
3030            send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3031        let response = self.client.send(request).await?;
3032        Ok(response)
3033    }
3034
3035    /// Send a state event to the homeserver. If state encryption is enabled in
3036    /// this room, the event will be encrypted.
3037    ///
3038    /// If the experimental state event encryption feature is enabled, this
3039    /// method will transparently encrypt the event if this room is
3040    /// encrypted (except if the event type is considered critical for the room
3041    /// to function, as outlined in [MSC4362][msc4362]).
3042    ///
3043    /// Returns the parsed response from the server.
3044    ///
3045    /// # Arguments
3046    ///
3047    /// * `content` - The content of the state event.
3048    ///
3049    /// * `state_key` - A unique key which defines the overwriting semantics for
3050    ///   this piece of room state.
3051    ///
3052    /// # Examples
3053    ///
3054    /// ```no_run
3055    /// # use serde::{Deserialize, Serialize};
3056    /// # async {
3057    /// # let joined_room: matrix_sdk::Room = todo!();
3058    /// use matrix_sdk::ruma::{
3059    ///     events::{
3060    ///         macros::EventContent,
3061    ///         room::member::{RoomMemberEventContent, MembershipState},
3062    ///     },
3063    ///     mxc_uri,
3064    /// };
3065    ///
3066    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3067    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3068    /// content.avatar_url = Some(avatar_url);
3069    ///
3070    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3071    ///
3072    /// // Custom event:
3073    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3074    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3075    /// struct XxxStateEventContent { /* fields... */ }
3076    ///
3077    /// let content: XxxStateEventContent = todo!();
3078    /// joined_room.send_state_event_for_key("foo", content).await?;
3079    /// # anyhow::Ok(()) };
3080    /// ```
3081    ///
3082    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3083    #[cfg(feature = "experimental-encrypted-state-events")]
3084    pub fn send_state_event_for_key<'a, C, K>(
3085        &'a self,
3086        state_key: &K,
3087        content: C,
3088    ) -> SendStateEvent<'a>
3089    where
3090        C: StateEventContent,
3091        C::StateKey: Borrow<K>,
3092        K: AsRef<str> + ?Sized,
3093    {
3094        SendStateEvent::new(self, state_key, content)
3095    }
3096
3097    /// Send a raw room state event to the homeserver.
3098    ///
3099    /// Returns the parsed response from the server.
3100    ///
3101    /// # Arguments
3102    ///
3103    /// * `event_type` - The type of the event that we're sending out.
3104    ///
3105    /// * `state_key` - A unique key which defines the overwriting semantics for
3106    /// this piece of room state. This value is often a zero-length string.
3107    ///
3108    /// * `content` - The content of the event as a raw JSON value. The argument
3109    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3110    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3111    ///
3112    /// # Examples
3113    ///
3114    /// ```no_run
3115    /// use serde_json::json;
3116    ///
3117    /// # async {
3118    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3119    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3120    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3121    ///
3122    /// if let Some(room) = client.get_room(&room_id) {
3123    ///     room.send_state_event_raw("m.room.member", "", json!({
3124    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3125    ///         "displayname": "Alice Margatroid",
3126    ///         "membership": "join",
3127    ///     })).await?;
3128    /// }
3129    /// # anyhow::Ok(()) };
3130    /// ```
3131    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3132    #[instrument(skip_all)]
3133    pub async fn send_state_event_raw(
3134        &self,
3135        event_type: &str,
3136        state_key: &str,
3137        content: impl IntoRawStateEventContent,
3138    ) -> Result<send_state_event::v3::Response> {
3139        self.ensure_room_joined()?;
3140
3141        let request = send_state_event::v3::Request::new_raw(
3142            self.room_id().to_owned(),
3143            event_type.into(),
3144            state_key.to_owned(),
3145            content.into_raw_state_event_content(),
3146        );
3147
3148        Ok(self.client.send(request).await?)
3149    }
3150
3151    /// Send a raw room state event to the homeserver.
3152    ///
3153    /// If the experimental state event encryption feature is enabled, this
3154    /// method will transparently encrypt the event if this room is
3155    /// encrypted (except if the event type is considered critical for the room
3156    /// to function, as outlined in [MSC4362][msc4362]).
3157    ///
3158    /// Returns the parsed response from the server.
3159    ///
3160    /// # Arguments
3161    ///
3162    /// * `event_type` - The type of the event that we're sending out.
3163    ///
3164    /// * `state_key` - A unique key which defines the overwriting semantics for
3165    /// this piece of room state. This value is often a zero-length string.
3166    ///
3167    /// * `content` - The content of the event as a raw JSON value. The argument
3168    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3169    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3170    ///
3171    /// # Examples
3172    ///
3173    /// ```no_run
3174    /// use serde_json::json;
3175    ///
3176    /// # async {
3177    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3178    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3179    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3180    ///
3181    /// if let Some(room) = client.get_room(&room_id) {
3182    ///     room.send_state_event_raw("m.room.member", "", json!({
3183    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3184    ///         "displayname": "Alice Margatroid",
3185    ///         "membership": "join",
3186    ///     })).await?;
3187    /// }
3188    /// # anyhow::Ok(()) };
3189    /// ```
3190    ///
3191    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3192    #[cfg(feature = "experimental-encrypted-state-events")]
3193    #[instrument(skip_all)]
3194    pub fn send_state_event_raw<'a>(
3195        &'a self,
3196        event_type: &'a str,
3197        state_key: &'a str,
3198        content: impl IntoRawStateEventContent,
3199    ) -> SendRawStateEvent<'a> {
3200        SendRawStateEvent::new(self, event_type, state_key, content)
3201    }
3202
3203    /// Strips all information out of an event of the room.
3204    ///
3205    /// Returns the [`redact_event::v3::Response`] from the server.
3206    ///
3207    /// This cannot be undone. Users may redact their own events, and any user
3208    /// with a power level greater than or equal to the redact power level of
3209    /// the room may redact events there.
3210    ///
3211    /// # Arguments
3212    ///
3213    /// * `event_id` - The ID of the event to redact
3214    ///
3215    /// * `reason` - The reason for the event being redacted.
3216    ///
3217    /// * `txn_id` - A unique ID that can be attached to this event as
3218    /// its transaction ID. If not given one is created for the message.
3219    ///
3220    /// # Examples
3221    ///
3222    /// ```no_run
3223    /// use matrix_sdk::ruma::event_id;
3224    ///
3225    /// # async {
3226    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3227    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3228    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3229    /// #
3230    /// if let Some(room) = client.get_room(&room_id) {
3231    ///     let event_id = event_id!("$xxxxxx:example.org");
3232    ///     let reason = Some("Indecent material");
3233    ///     room.redact(&event_id, reason, None).await?;
3234    /// }
3235    /// # anyhow::Ok(()) };
3236    /// ```
3237    #[instrument(skip_all)]
3238    pub async fn redact(
3239        &self,
3240        event_id: &EventId,
3241        reason: Option<&str>,
3242        txn_id: Option<OwnedTransactionId>,
3243    ) -> HttpResult<redact_event::v3::Response> {
3244        let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3245        let request = assign!(
3246            redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3247            { reason: reason.map(ToOwned::to_owned) }
3248        );
3249
3250        self.client.send(request).await
3251    }
3252
3253    /// Get a list of servers that should know this room.
3254    ///
3255    /// Uses the synced members of the room and the suggested [routing
3256    /// algorithm] from the Matrix spec.
3257    ///
3258    /// Returns at most three servers.
3259    ///
3260    /// [routing algorithm]: https://spec.matrix.org/v1.3/appendices/#routing
3261    pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3262        let acl_ev = self
3263            .get_state_event_static::<RoomServerAclEventContent>()
3264            .await?
3265            .and_then(|ev| ev.deserialize().ok());
3266        let acl = acl_ev.as_ref().and_then(|ev| match ev {
3267            SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3268            SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3269        });
3270
3271        // Filter out server names that:
3272        // - Are blocked due to server ACLs
3273        // - Are IP addresses
3274        let members: Vec<_> = self
3275            .members_no_sync(RoomMemberships::JOIN)
3276            .await?
3277            .into_iter()
3278            .filter(|member| {
3279                let server = member.user_id().server_name();
3280                acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3281            })
3282            .collect();
3283
3284        // Get the server of the highest power level user in the room, provided
3285        // they are at least power level 50.
3286        let max = members
3287            .iter()
3288            .max_by_key(|member| member.power_level())
3289            .filter(|max| max.power_level() >= int!(50))
3290            .map(|member| member.user_id().server_name());
3291
3292        // Sort the servers by population.
3293        let servers = members
3294            .iter()
3295            .map(|member| member.user_id().server_name())
3296            .filter(|server| max.filter(|max| max == server).is_none())
3297            .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3298                *servers.entry(server).or_default() += 1;
3299                servers
3300            });
3301        let mut servers: Vec<_> = servers.into_iter().collect();
3302        servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3303
3304        Ok(max
3305            .into_iter()
3306            .chain(servers.into_iter().map(|(name, _)| name))
3307            .take(3)
3308            .map(ToOwned::to_owned)
3309            .collect())
3310    }
3311
3312    /// Get a `matrix.to` permalink to this room.
3313    ///
3314    /// If this room has an alias, we use it. Otherwise, we try to use the
3315    /// synced members in the room for [routing] the room ID.
3316    ///
3317    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3318    pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3319        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3320            return Ok(alias.matrix_to_uri());
3321        }
3322
3323        let via = self.route().await?;
3324        Ok(self.room_id().matrix_to_uri_via(via))
3325    }
3326
3327    /// Get a `matrix:` permalink to this room.
3328    ///
3329    /// If this room has an alias, we use it. Otherwise, we try to use the
3330    /// synced members in the room for [routing] the room ID.
3331    ///
3332    /// # Arguments
3333    ///
3334    /// * `join` - Whether the user should join the room.
3335    ///
3336    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3337    pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3338        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3339            return Ok(alias.matrix_uri(join));
3340        }
3341
3342        let via = self.route().await?;
3343        Ok(self.room_id().matrix_uri_via(via, join))
3344    }
3345
3346    /// Get a `matrix.to` permalink to an event in this room.
3347    ///
3348    /// We try to use the synced members in the room for [routing] the room ID.
3349    ///
3350    /// *Note*: This method does not check if the given event ID is actually
3351    /// part of this room. It needs to be checked before calling this method
3352    /// otherwise the permalink won't work.
3353    ///
3354    /// # Arguments
3355    ///
3356    /// * `event_id` - The ID of the event.
3357    ///
3358    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3359    pub async fn matrix_to_event_permalink(
3360        &self,
3361        event_id: impl Into<OwnedEventId>,
3362    ) -> Result<MatrixToUri> {
3363        // Don't use the alias because an event is tied to a room ID, but an
3364        // alias might point to another room, e.g. after a room upgrade.
3365        let via = self.route().await?;
3366        Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3367    }
3368
3369    /// Get a `matrix:` permalink to an event in this room.
3370    ///
3371    /// We try to use the synced members in the room for [routing] the room ID.
3372    ///
3373    /// *Note*: This method does not check if the given event ID is actually
3374    /// part of this room. It needs to be checked before calling this method
3375    /// otherwise the permalink won't work.
3376    ///
3377    /// # Arguments
3378    ///
3379    /// * `event_id` - The ID of the event.
3380    ///
3381    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3382    pub async fn matrix_event_permalink(
3383        &self,
3384        event_id: impl Into<OwnedEventId>,
3385    ) -> Result<MatrixUri> {
3386        // Don't use the alias because an event is tied to a room ID, but an
3387        // alias might point to another room, e.g. after a room upgrade.
3388        let via = self.route().await?;
3389        Ok(self.room_id().matrix_event_uri_via(event_id, via))
3390    }
3391
3392    /// Get the latest receipt of a user in this room.
3393    ///
3394    /// # Arguments
3395    ///
3396    /// * `receipt_type` - The type of receipt to get.
3397    ///
3398    /// * `thread` - The thread containing the event of the receipt, if any.
3399    ///
3400    /// * `user_id` - The ID of the user.
3401    ///
3402    /// Returns the ID of the event on which the receipt applies and the
3403    /// receipt.
3404    pub async fn load_user_receipt(
3405        &self,
3406        receipt_type: ReceiptType,
3407        thread: ReceiptThread,
3408        user_id: &UserId,
3409    ) -> Result<Option<(OwnedEventId, Receipt)>> {
3410        self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3411    }
3412
3413    /// Load the receipts for an event in this room from storage.
3414    ///
3415    /// # Arguments
3416    ///
3417    /// * `receipt_type` - The type of receipt to get.
3418    ///
3419    /// * `thread` - The thread containing the event of the receipt, if any.
3420    ///
3421    /// * `event_id` - The ID of the event.
3422    ///
3423    /// Returns a list of IDs of users who have sent a receipt for the event and
3424    /// the corresponding receipts.
3425    pub async fn load_event_receipts(
3426        &self,
3427        receipt_type: ReceiptType,
3428        thread: ReceiptThread,
3429        event_id: &EventId,
3430    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3431        self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3432    }
3433
3434    /// Get the push-condition context for this room.
3435    ///
3436    /// Returns `None` if some data couldn't be found. This should only happen
3437    /// in brand new rooms, while we process its state.
3438    pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3439        self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3440    }
3441
3442    /// Get the push-condition context for this room, with a choice to include
3443    /// thread subscriptions or not, based on the extra
3444    /// `with_threads_subscriptions` parameter.
3445    ///
3446    /// Returns `None` if some data couldn't be found. This should only happen
3447    /// in brand new rooms, while we process its state.
3448    pub(crate) async fn push_condition_room_ctx_internal(
3449        &self,
3450        with_threads_subscriptions: bool,
3451    ) -> Result<Option<PushConditionRoomCtx>> {
3452        let room_id = self.room_id();
3453        let user_id = self.own_user_id();
3454        let room_info = self.clone_info();
3455        let member_count = room_info.active_members_count();
3456
3457        let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3458            member.name().to_owned()
3459        } else {
3460            return Ok(None);
3461        };
3462
3463        let power_levels = match self.power_levels().await {
3464            Ok(power_levels) => Some(power_levels.into()),
3465            Err(error) => {
3466                if matches!(room_info.state(), RoomState::Joined) {
3467                    // It's normal to not have the power levels in a non-joined room, so don't log
3468                    // the error if the room is not joined
3469                    error!("Could not compute power levels for push conditions: {error}");
3470                }
3471                None
3472            }
3473        };
3474
3475        let mut ctx = assign!(PushConditionRoomCtx::new(
3476            room_id.to_owned(),
3477            UInt::new(member_count).unwrap_or(UInt::MAX),
3478            user_id.to_owned(),
3479            user_display_name,
3480        ),
3481        {
3482            power_levels,
3483        });
3484
3485        if with_threads_subscriptions {
3486            let this = self.clone();
3487            ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3488                let room = this.clone();
3489                Box::pin(async move {
3490                    if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3491                        maybe_sub.is_some()
3492                    } else {
3493                        false
3494                    }
3495                })
3496            });
3497        }
3498
3499        Ok(Some(ctx))
3500    }
3501
3502    /// Retrieves a [`PushContext`] that can be used to compute the push
3503    /// actions for events.
3504    pub async fn push_context(&self) -> Result<Option<PushContext>> {
3505        self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3506    }
3507
3508    /// Retrieves a [`PushContext`] that can be used to compute the push actions
3509    /// for events, with a choice to include thread subscriptions or not,
3510    /// based on the extra `with_threads_subscriptions` parameter.
3511    #[instrument(skip(self))]
3512    pub(crate) async fn push_context_internal(
3513        &self,
3514        with_threads_subscriptions: bool,
3515    ) -> Result<Option<PushContext>> {
3516        let Some(push_condition_room_ctx) =
3517            self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3518        else {
3519            debug!("Could not aggregate push context");
3520            return Ok(None);
3521        };
3522        let push_rules = self.client().account().push_rules().await?;
3523        Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3524    }
3525
3526    /// Get the push actions for the given event with the current room state.
3527    ///
3528    /// Note that it is possible that no push action is returned because the
3529    /// current room state does not have all the required state events.
3530    pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3531        if let Some(ctx) = self.push_context().await? {
3532            Ok(Some(ctx.for_event(event).await))
3533        } else {
3534            Ok(None)
3535        }
3536    }
3537
3538    /// The membership details of the (latest) invite for the logged-in user in
3539    /// this room.
3540    pub async fn invite_details(&self) -> Result<Invite> {
3541        let state = self.state();
3542
3543        if state != RoomState::Invited {
3544            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3545        }
3546
3547        let invitee = self
3548            .get_member_no_sync(self.own_user_id())
3549            .await?
3550            .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3551        let event = invitee.event();
3552        let inviter_id = event.sender();
3553        let inviter = self.get_member_no_sync(inviter_id).await?;
3554        Ok(Invite { invitee, inviter })
3555    }
3556
3557    /// Get the membership details for the current user.
3558    ///
3559    /// Returns:
3560    ///     - If the user was present in the room, a
3561    ///       [`RoomMemberWithSenderInfo`] containing both the user info and the
3562    ///       member info of the sender of the `m.room.member` event.
3563    ///     - If the current user is not present, an error.
3564    pub async fn member_with_sender_info(
3565        &self,
3566        user_id: &UserId,
3567    ) -> Result<RoomMemberWithSenderInfo> {
3568        let Some(member) = self.get_member_no_sync(user_id).await? else {
3569            return Err(Error::InsufficientData);
3570        };
3571
3572        let sender_member =
3573            if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3574                // If the sender room member info is already available, return it
3575                Some(member)
3576            } else if self.are_members_synced() {
3577                // The room members are synced and we couldn't find the sender info
3578                None
3579            } else if self.sync_members().await.is_ok() {
3580                // Try getting the sender room member info again after syncing
3581                self.get_member_no_sync(member.event().sender()).await?
3582            } else {
3583                None
3584            };
3585
3586        Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3587    }
3588
3589    /// Forget this room.
3590    ///
3591    /// This communicates to the homeserver that it should forget the room.
3592    ///
3593    /// Only left or banned-from rooms can be forgotten.
3594    pub async fn forget(&self) -> Result<()> {
3595        let state = self.state();
3596        match state {
3597            RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3598                return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3599                    "Left / Banned",
3600                    state,
3601                ))));
3602            }
3603            RoomState::Left | RoomState::Banned => {}
3604        }
3605
3606        let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3607        let _response = self.client.send(request).await?;
3608
3609        // If it was a DM, remove the room from the `m.direct` global account data.
3610        if self.inner.direct_targets_length() != 0
3611            && let Err(e) = self.set_is_direct(false).await
3612        {
3613            // It is not important whether we managed to remove the room, it will not have
3614            // any consequences, so just log the error.
3615            warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3616        }
3617
3618        self.client.base_client().forget_room(self.inner.room_id()).await?;
3619
3620        Ok(())
3621    }
3622
3623    fn ensure_room_joined(&self) -> Result<()> {
3624        let state = self.state();
3625        if state == RoomState::Joined {
3626            Ok(())
3627        } else {
3628            Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3629        }
3630    }
3631
3632    /// Get the notification mode.
3633    pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3634        if !matches!(self.state(), RoomState::Joined) {
3635            return None;
3636        }
3637
3638        let notification_settings = self.client().notification_settings().await;
3639
3640        // Get the user-defined mode if available
3641        let notification_mode =
3642            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3643
3644        if notification_mode.is_some() {
3645            notification_mode
3646        } else if let Ok(is_encrypted) =
3647            self.latest_encryption_state().await.map(|state| state.is_encrypted())
3648        {
3649            // Otherwise, if encrypted status is available, get the default mode for this
3650            // type of room.
3651            // From the point of view of notification settings, a `one-to-one` room is one
3652            // that involves exactly two people.
3653            let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3654            let default_mode = notification_settings
3655                .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3656                .await;
3657            Some(default_mode)
3658        } else {
3659            None
3660        }
3661    }
3662
3663    /// Get the user-defined notification mode.
3664    ///
3665    /// The result is cached for fast and non-async call. To read the cached
3666    /// result, use
3667    /// [`matrix_sdk_base::Room::cached_user_defined_notification_mode`].
3668    //
3669    // Note for maintainers:
3670    //
3671    // The fact the result is cached is an important property. If you change that in
3672    // the future, please review all calls to this method.
3673    pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3674        if !matches!(self.state(), RoomState::Joined) {
3675            return None;
3676        }
3677
3678        let notification_settings = self.client().notification_settings().await;
3679
3680        // Get the user-defined mode if available.
3681        let mode =
3682            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3683
3684        if let Some(mode) = mode {
3685            self.update_cached_user_defined_notification_mode(mode);
3686        }
3687
3688        mode
3689    }
3690
3691    /// Report an event as inappropriate to the homeserver's administrator.
3692    ///
3693    /// # Arguments
3694    ///
3695    /// * `event_id` - The ID of the event to report.
3696    /// * `score` - The score to rate this content.
3697    /// * `reason` - The reason the content is being reported.
3698    ///
3699    /// # Errors
3700    ///
3701    /// Returns an error if the room is not joined or if an error occurs with
3702    /// the request.
3703    pub async fn report_content(
3704        &self,
3705        event_id: OwnedEventId,
3706        score: Option<ReportedContentScore>,
3707        reason: Option<String>,
3708    ) -> Result<report_content::v3::Response> {
3709        let state = self.state();
3710        if state != RoomState::Joined {
3711            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3712        }
3713
3714        let request = report_content::v3::Request::new(
3715            self.inner.room_id().to_owned(),
3716            event_id,
3717            score.map(Into::into),
3718            reason,
3719        );
3720        Ok(self.client.send(request).await?)
3721    }
3722
3723    /// Reports a room as inappropriate to the server.
3724    /// The caller is not required to be joined to the room to report it.
3725    ///
3726    /// # Arguments
3727    ///
3728    /// * `reason` - The reason the room is being reported.
3729    ///
3730    /// # Errors
3731    ///
3732    /// Returns an error if the room is not found or on rate limit
3733    pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3734        let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3735
3736        Ok(self.client.send(request).await?)
3737    }
3738
3739    /// Set a flag on the room to indicate that the user has explicitly marked
3740    /// it as (un)read.
3741    ///
3742    /// This is a no-op if [`BaseRoom::is_marked_unread()`] returns the same
3743    /// value as `unread`.
3744    pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3745        if self.is_marked_unread() == unread {
3746            // The request is not necessary.
3747            return Ok(());
3748        }
3749
3750        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3751
3752        let content = MarkedUnreadEventContent::new(unread);
3753
3754        let request = set_room_account_data::v3::Request::new(
3755            user_id.to_owned(),
3756            self.inner.room_id().to_owned(),
3757            &content,
3758        )?;
3759
3760        self.client.send(request).await?;
3761        Ok(())
3762    }
3763
3764    /// Returns the [`RoomEventCache`] associated to this room, assuming the
3765    /// global [`EventCache`] has been enabled for subscription.
3766    pub async fn event_cache(
3767        &self,
3768    ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3769        self.client.event_cache().for_room(self.room_id()).await
3770    }
3771
3772    /// Get the beacon information event in the room for the `user_id`.
3773    ///
3774    /// # Errors
3775    ///
3776    /// Returns an error if the event is redacted, stripped, not found or could
3777    /// not be deserialized.
3778    pub(crate) async fn get_user_beacon_info(
3779        &self,
3780        user_id: &UserId,
3781    ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3782        let raw_event = self
3783            .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3784            .await?
3785            .ok_or(BeaconError::NotFound)?;
3786
3787        match raw_event.deserialize()? {
3788            SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3789            SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3790            SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3791        }
3792    }
3793
3794    /// Start sharing live location in the room.
3795    ///
3796    /// # Arguments
3797    ///
3798    /// * `duration_millis` - The duration for which the live location is
3799    ///   shared, in milliseconds.
3800    /// * `description` - An optional description for the live location share.
3801    ///
3802    /// # Errors
3803    ///
3804    /// Returns an error if the room is not joined or if the state event could
3805    /// not be sent.
3806    pub async fn start_live_location_share(
3807        &self,
3808        duration_millis: u64,
3809        description: Option<String>,
3810    ) -> Result<send_state_event::v3::Response> {
3811        self.ensure_room_joined()?;
3812
3813        self.send_state_event_for_key(
3814            self.own_user_id(),
3815            BeaconInfoEventContent::new(
3816                description,
3817                Duration::from_millis(duration_millis),
3818                true,
3819                None,
3820            ),
3821        )
3822        .await
3823    }
3824
3825    /// Stop sharing live location in the room.
3826    ///
3827    /// # Errors
3828    ///
3829    /// Returns an error if the room is not joined, if the beacon information
3830    /// is redacted or stripped, or if the state event is not found.
3831    pub async fn stop_live_location_share(
3832        &self,
3833    ) -> Result<send_state_event::v3::Response, BeaconError> {
3834        self.ensure_room_joined()?;
3835
3836        let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3837        beacon_info_event.content.stop();
3838        Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3839    }
3840
3841    /// Send a location beacon event in the current room.
3842    ///
3843    /// # Arguments
3844    ///
3845    /// * `geo_uri` - The geo URI of the location beacon.
3846    ///
3847    /// # Errors
3848    ///
3849    /// Returns an error if the room is not joined, if the beacon information
3850    /// is redacted or stripped, if the location share is no longer live,
3851    /// or if the state event is not found.
3852    pub async fn send_location_beacon(
3853        &self,
3854        geo_uri: String,
3855    ) -> Result<send_message_event::v3::Response, BeaconError> {
3856        self.ensure_room_joined()?;
3857
3858        let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3859
3860        if beacon_info_event.content.is_live() {
3861            let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3862            Ok(self.send(content).await?.response)
3863        } else {
3864            Err(BeaconError::NotLive)
3865        }
3866    }
3867
3868    /// Store the given `ComposerDraft` in the state store using the current
3869    /// room id and optional thread root id as identifier.
3870    pub async fn save_composer_draft(
3871        &self,
3872        draft: ComposerDraft,
3873        thread_root: Option<&EventId>,
3874    ) -> Result<()> {
3875        self.client
3876            .state_store()
3877            .set_kv_data(
3878                StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
3879                StateStoreDataValue::ComposerDraft(draft),
3880            )
3881            .await?;
3882        Ok(())
3883    }
3884
3885    /// Retrieve the `ComposerDraft` stored in the state store for this room
3886    /// and given thread, if any.
3887    pub async fn load_composer_draft(
3888        &self,
3889        thread_root: Option<&EventId>,
3890    ) -> Result<Option<ComposerDraft>> {
3891        let data = self
3892            .client
3893            .state_store()
3894            .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3895            .await?;
3896        Ok(data.and_then(|d| d.into_composer_draft()))
3897    }
3898
3899    /// Remove the `ComposerDraft` stored in the state store for this room
3900    /// and given thread, if any.
3901    pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
3902        self.client
3903            .state_store()
3904            .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3905            .await?;
3906        Ok(())
3907    }
3908
3909    /// Load pinned state events for a room from the `/state` endpoint in the
3910    /// home server.
3911    pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3912        let response = self
3913            .client
3914            .send(get_state_event_for_key::v3::Request::new(
3915                self.room_id().to_owned(),
3916                StateEventType::RoomPinnedEvents,
3917                "".to_owned(),
3918            ))
3919            .await;
3920
3921        match response {
3922            Ok(response) => Ok(Some(
3923                response
3924                    .into_content()
3925                    .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
3926                    .pinned,
3927            )),
3928            Err(http_error) => match http_error.as_client_api_error() {
3929                Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3930                _ => Err(http_error.into()),
3931            },
3932        }
3933    }
3934
3935    /// Observe live location sharing events for this room.
3936    ///
3937    /// The returned observable will receive the newest event for each sync
3938    /// response that contains an `m.beacon` event.
3939    ///
3940    /// Returns a stream of [`ObservableLiveLocation`] events from other users
3941    /// in the room, excluding the live location events of the room's own user.
3942    pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3943        ObservableLiveLocation::new(&self.client, self.room_id())
3944    }
3945
3946    /// Subscribe to knock requests in this `Room`.
3947    ///
3948    /// The current requests to join the room will be emitted immediately
3949    /// when subscribing.
3950    ///
3951    /// A new set of knock requests will be emitted whenever:
3952    /// - A new member event is received.
3953    /// - A knock request is marked as seen.
3954    /// - A sync is gappy (limited), so room membership information may be
3955    ///   outdated.
3956    ///
3957    /// Returns both a stream of knock requests and a handle for a task that
3958    /// will clean up the seen knock request ids when possible.
3959    pub async fn subscribe_to_knock_requests(
3960        &self,
3961    ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
3962        let this = Arc::new(self.clone());
3963
3964        let room_member_events_observer =
3965            self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3966
3967        let current_seen_ids = self.get_seen_knock_request_ids().await?;
3968        let mut seen_request_ids_stream = self
3969            .seen_knock_request_ids_map
3970            .subscribe()
3971            .await
3972            .map(|values| values.unwrap_or_default());
3973
3974        let mut room_info_stream = self.subscribe_info();
3975
3976        // Spawn a task that will clean up the seen knock request ids when updated room
3977        // members are received
3978        let clear_seen_ids_handle = spawn({
3979            let this = self.clone();
3980            async move {
3981                let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3982                while member_updates_stream.recv().await.is_ok() {
3983                    // If room members were updated, try to remove outdated seen knock request ids
3984                    if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3985                        warn!("Failed to remove seen knock requests: {err}")
3986                    }
3987                }
3988            }
3989        });
3990
3991        let combined_stream = stream! {
3992            // Emit current requests to join
3993            match this.get_current_join_requests(&current_seen_ids).await {
3994                Ok(initial_requests) => yield initial_requests,
3995                Err(err) => warn!("Failed to get initial requests to join: {err}")
3996            }
3997
3998            let mut requests_stream = room_member_events_observer.subscribe();
3999            let mut seen_ids = current_seen_ids.clone();
4000
4001            loop {
4002                // This is equivalent to a combine stream operation, triggering a new emission
4003                // when any of the branches changes
4004                tokio::select! {
4005                    Some((event, _)) = requests_stream.next() => {
4006                        if let Some(event) = event.as_original() {
4007                            // If we can calculate the membership change, try to emit only when needed
4008                            let emit = if event.prev_content().is_some() {
4009                                matches!(event.membership_change(),
4010                                    MembershipChange::Banned |
4011                                    MembershipChange::Knocked |
4012                                    MembershipChange::KnockAccepted |
4013                                    MembershipChange::KnockDenied |
4014                                    MembershipChange::KnockRetracted
4015                                )
4016                            } else {
4017                                // If we can't calculate the membership change, assume we need to
4018                                // emit updated values
4019                                true
4020                            };
4021
4022                            if emit {
4023                                match this.get_current_join_requests(&seen_ids).await {
4024                                    Ok(requests) => yield requests,
4025                                    Err(err) => {
4026                                        warn!("Failed to get updated knock requests on new member event: {err}")
4027                                    }
4028                                }
4029                            }
4030                        }
4031                    }
4032
4033                    Some(new_seen_ids) = seen_request_ids_stream.next() => {
4034                        // Update the current seen ids
4035                        seen_ids = new_seen_ids;
4036
4037                        // If seen requests have changed we need to recalculate
4038                        // all the knock requests
4039                        match this.get_current_join_requests(&seen_ids).await {
4040                            Ok(requests) => yield requests,
4041                            Err(err) => {
4042                                warn!("Failed to get updated knock requests on seen ids changed: {err}")
4043                            }
4044                        }
4045                    }
4046
4047                    Some(room_info) = room_info_stream.next() => {
4048                        // We need to emit new items when we may have missing room members:
4049                        // this usually happens after a gappy (limited) sync
4050                        if !room_info.are_members_synced() {
4051                            match this.get_current_join_requests(&seen_ids).await {
4052                                Ok(requests) => yield requests,
4053                                Err(err) => {
4054                                    warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4055                                }
4056                            }
4057                        }
4058                    }
4059                    // If the streams in all branches are closed, stop the loop
4060                    else => break,
4061                }
4062            }
4063        };
4064
4065        Ok((combined_stream, clear_seen_ids_handle))
4066    }
4067
4068    async fn get_current_join_requests(
4069        &self,
4070        seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4071    ) -> Result<Vec<KnockRequest>> {
4072        Ok(self
4073            .members(RoomMemberships::KNOCK)
4074            .await?
4075            .into_iter()
4076            .filter_map(|member| {
4077                let event_id = member.event().event_id()?;
4078                Some(KnockRequest::new(
4079                    self,
4080                    event_id,
4081                    member.event().timestamp(),
4082                    KnockRequestMemberInfo::from_member(&member),
4083                    seen_request_ids.contains_key(event_id),
4084                ))
4085            })
4086            .collect())
4087    }
4088
4089    /// Access the room settings related to privacy and visibility.
4090    pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4091        RoomPrivacySettings::new(&self.inner, &self.client)
4092    }
4093
4094    /// Retrieve a list of all the threads for the current room.
4095    ///
4096    /// Since this client-server API is paginated, the return type may include a
4097    /// token used to resuming back-pagination into the list of results, in
4098    /// [`ThreadRoots::prev_batch_token`]. This token can be fed back into
4099    /// [`ListThreadsOptions::from`] to continue the pagination
4100    /// from the previous position.
4101    pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4102        let request = opts.into_request(self.room_id());
4103
4104        let response = self.client.send(request).await?;
4105
4106        let push_ctx = self.push_context().await?;
4107        let chunk = join_all(
4108            response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4109        )
4110        .await;
4111
4112        Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4113    }
4114
4115    /// Retrieve a list of relations for the given event, according to the given
4116    /// options.
4117    ///
4118    /// Since this client-server API is paginated, the return type may include a
4119    /// token used to resuming back-pagination into the list of results, in
4120    /// [`Relations::prev_batch_token`]. This token can be fed back into
4121    /// [`RelationsOptions::from`] to continue the pagination from the previous
4122    /// position.
4123    ///
4124    /// **Note**: if [`RelationsOptions::from`] is set for a subsequent request,
4125    /// then it must be used with the same
4126    /// [`RelationsOptions::include_relations`] value as the request that
4127    /// returns the `from` token, otherwise the server behavior is undefined.
4128    pub async fn relations(
4129        &self,
4130        event_id: OwnedEventId,
4131        opts: RelationsOptions,
4132    ) -> Result<Relations> {
4133        let relations = opts.send(self, event_id).await;
4134
4135        // Save any new related events to the cache.
4136        if let Ok(Relations { chunk, .. }) = &relations
4137            && let Ok((cache, _handles)) = self.event_cache().await
4138        {
4139            cache.save_events(chunk.clone()).await;
4140        }
4141
4142        relations
4143    }
4144
4145    /// Search this room's [`RoomIndex`] for query and return at most
4146    /// max_number_of_results results.
4147    #[cfg(feature = "experimental-search")]
4148    pub async fn search(
4149        &self,
4150        query: &str,
4151        max_number_of_results: usize,
4152        pagination_offset: Option<usize>,
4153    ) -> Result<Vec<OwnedEventId>, IndexError> {
4154        let mut search_index_guard = self.client.search_index().lock().await;
4155        search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4156    }
4157
4158    /// Subscribe to a given thread in this room.
4159    ///
4160    /// This will subscribe the user to the thread, so that they will receive
4161    /// notifications for that thread specifically.
4162    ///
4163    /// # Arguments
4164    ///
4165    /// - `thread_root`: The ID of the thread root event to subscribe to.
4166    /// - `automatic`: Whether the subscription was made automatically by a
4167    ///   client, not by manual user choice. If set, must include the latest
4168    ///   event ID that's known in the thread and that is causing the automatic
4169    ///   subscription. If unset (i.e. we're now subscribing manually) and there
4170    ///   was a previous automatic subscription, the subscription will be
4171    ///   overridden to a manual one instead.
4172    ///
4173    /// # Returns
4174    ///
4175    /// - A 404 error if the event isn't known, or isn't a thread root.
4176    /// - An `Ok` result if the subscription was successful, or if the server
4177    ///   skipped an automatic subscription (as the user unsubscribed from the
4178    ///   thread after the event causing the automatic subscription).
4179    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4180    pub async fn subscribe_thread(
4181        &self,
4182        thread_root: OwnedEventId,
4183        automatic: Option<OwnedEventId>,
4184    ) -> Result<()> {
4185        let is_automatic = automatic.is_some();
4186
4187        match self
4188            .client
4189            .send(subscribe_thread::unstable::Request::new(
4190                self.room_id().to_owned(),
4191                thread_root.clone(),
4192                automatic,
4193            ))
4194            .await
4195        {
4196            Ok(_response) => {
4197                trace!("Server acknowledged the thread subscription; saving in db");
4198
4199                // Immediately save the result into the database.
4200                self.client
4201                    .state_store()
4202                    .upsert_thread_subscriptions(vec![(
4203                        self.room_id(),
4204                        &thread_root,
4205                        StoredThreadSubscription {
4206                            status: ThreadSubscriptionStatus::Subscribed {
4207                                automatic: is_automatic,
4208                            },
4209                            bump_stamp: None,
4210                        },
4211                    )])
4212                    .await?;
4213
4214                Ok(())
4215            }
4216
4217            Err(err) => {
4218                if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4219                    // In this case: the server indicates that the user unsubscribed *after* the
4220                    // event ID we've used in an automatic subscription; don't
4221                    // save the subscription state in the database, as the
4222                    // previous one should be more correct.
4223                    trace!("Thread subscription skipped: {err}");
4224                    Ok(())
4225                } else {
4226                    // Forward the error to the caller.
4227                    Err(err.into())
4228                }
4229            }
4230        }
4231    }
4232
4233    /// Subscribe to a thread if needed, based on a current subscription to it.
4234    ///
4235    /// This is like [`Self::subscribe_thread`], but it first checks if the user
4236    /// has already subscribed to a thread, so as to minimize sending
4237    /// unnecessary subscriptions which would be ignored by the server.
4238    pub async fn subscribe_thread_if_needed(
4239        &self,
4240        thread_root: &EventId,
4241        automatic: Option<OwnedEventId>,
4242    ) -> Result<()> {
4243        if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4244            // If we have a previous subscription, we should only send the new one if it's
4245            // manual and the previous one was automatic.
4246            if !prev_sub.automatic || automatic.is_some() {
4247                // Either we had already a manual subscription, or we had an automatic one and
4248                // the new one is automatic too: nothing to do!
4249                return Ok(());
4250            }
4251        }
4252        self.subscribe_thread(thread_root.to_owned(), automatic).await
4253    }
4254
4255    /// Unsubscribe from a given thread in this room.
4256    ///
4257    /// # Arguments
4258    ///
4259    /// - `thread_root`: The ID of the thread root event to unsubscribe to.
4260    ///
4261    /// # Returns
4262    ///
4263    /// - An `Ok` result if the unsubscription was successful, or the thread was
4264    ///   already unsubscribed.
4265    /// - A 404 error if the event isn't known, or isn't a thread root.
4266    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4267    pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4268        self.client
4269            .send(unsubscribe_thread::unstable::Request::new(
4270                self.room_id().to_owned(),
4271                thread_root.clone(),
4272            ))
4273            .await?;
4274
4275        trace!("Server acknowledged the thread subscription removal; removed it from db too");
4276
4277        // Immediately save the result into the database.
4278        self.client
4279            .state_store()
4280            .upsert_thread_subscriptions(vec![(
4281                self.room_id(),
4282                &thread_root,
4283                StoredThreadSubscription {
4284                    status: ThreadSubscriptionStatus::Unsubscribed,
4285                    bump_stamp: None,
4286                },
4287            )])
4288            .await?;
4289
4290        Ok(())
4291    }
4292
4293    /// Return the current thread subscription for the given thread root in this
4294    /// room.
4295    ///
4296    /// # Arguments
4297    ///
4298    /// - `thread_root`: The ID of the thread root event to get the subscription
4299    ///   for.
4300    ///
4301    /// # Returns
4302    ///
4303    /// - An `Ok` result with `Some(ThreadSubscription)` if we have some
4304    ///   subscription information.
4305    /// - An `Ok` result with `None` if the subscription does not exist, or the
4306    ///   event couldn't be found, or the event isn't a thread.
4307    /// - An error if the request fails for any other reason, such as a network
4308    ///   error.
4309    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4310    pub async fn fetch_thread_subscription(
4311        &self,
4312        thread_root: OwnedEventId,
4313    ) -> Result<Option<ThreadSubscription>> {
4314        let result = self
4315            .client
4316            .send(get_thread_subscription::unstable::Request::new(
4317                self.room_id().to_owned(),
4318                thread_root.clone(),
4319            ))
4320            .await;
4321
4322        let subscription = match result {
4323            Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4324            Err(http_error) => match http_error.as_client_api_error() {
4325                Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4326                _ => return Err(http_error.into()),
4327            },
4328        };
4329
4330        // Keep the database in sync.
4331        if let Some(sub) = &subscription {
4332            self.client
4333                .state_store()
4334                .upsert_thread_subscriptions(vec![(
4335                    self.room_id(),
4336                    &thread_root,
4337                    StoredThreadSubscription {
4338                        status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4339                        bump_stamp: None,
4340                    },
4341                )])
4342                .await?;
4343        } else {
4344            // If the subscription was not found, remove it from the database.
4345            self.client
4346                .state_store()
4347                .remove_thread_subscription(self.room_id(), &thread_root)
4348                .await?;
4349        }
4350
4351        Ok(subscription)
4352    }
4353
4354    /// Return the current thread subscription for the given thread root in this
4355    /// room, by getting it from storage if possible, or fetching it from
4356    /// network otherwise.
4357    ///
4358    /// See also [`Self::fetch_thread_subscription`] for the exact semantics of
4359    /// this method.
4360    pub async fn load_or_fetch_thread_subscription(
4361        &self,
4362        thread_root: &EventId,
4363    ) -> Result<Option<ThreadSubscription>> {
4364        // If the thread subscriptions list is outdated, fetch from the server.
4365        if self.client.thread_subscription_catchup().is_outdated() {
4366            return self.fetch_thread_subscription(thread_root.to_owned()).await;
4367        }
4368
4369        // Otherwise, we can rely on the store information.
4370        Ok(self
4371            .client
4372            .state_store()
4373            .load_thread_subscription(self.room_id(), thread_root)
4374            .await
4375            .map(|maybe_sub| {
4376                maybe_sub.and_then(|stored| match stored.status {
4377                    ThreadSubscriptionStatus::Unsubscribed => None,
4378                    ThreadSubscriptionStatus::Subscribed { automatic } => {
4379                        Some(ThreadSubscription { automatic })
4380                    }
4381                })
4382            })?)
4383    }
4384}
4385
4386#[cfg(feature = "e2e-encryption")]
4387impl RoomIdentityProvider for Room {
4388    fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4389        Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4390    }
4391
4392    fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4393        Box::pin(async {
4394            let members = self
4395                .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4396                .await
4397                .unwrap_or_else(|_| Default::default());
4398
4399            let mut ret: Vec<UserIdentity> = Vec::new();
4400            for member in members {
4401                if let Some(i) = self.user_identity(member.user_id()).await {
4402                    ret.push(i);
4403                }
4404            }
4405            ret
4406        })
4407    }
4408
4409    fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4410        Box::pin(async {
4411            self.client
4412                .encryption()
4413                .get_user_identity(user_id)
4414                .await
4415                .unwrap_or(None)
4416                .map(|u| u.underlying_identity())
4417        })
4418    }
4419}
4420
4421/// A wrapper for a weak client and a room id that allows to lazily retrieve a
4422/// room, only when needed.
4423#[derive(Clone, Debug)]
4424pub(crate) struct WeakRoom {
4425    client: WeakClient,
4426    room_id: OwnedRoomId,
4427}
4428
4429impl WeakRoom {
4430    /// Create a new `WeakRoom` given its weak components.
4431    pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4432        Self { client, room_id }
4433    }
4434
4435    /// Attempts to reconstruct the room.
4436    pub fn get(&self) -> Option<Room> {
4437        self.client.get().and_then(|client| client.get_room(&self.room_id))
4438    }
4439
4440    /// The room id for that room.
4441    pub fn room_id(&self) -> &RoomId {
4442        &self.room_id
4443    }
4444}
4445
4446/// Details of the (latest) invite.
4447#[derive(Debug, Clone)]
4448pub struct Invite {
4449    /// Who has been invited.
4450    pub invitee: RoomMember,
4451    /// Who sent the invite.
4452    pub inviter: Option<RoomMember>,
4453}
4454
4455#[derive(Error, Debug)]
4456enum InvitationError {
4457    #[error("No membership event found")]
4458    EventMissing,
4459}
4460
4461/// Receipts to send all at once.
4462#[derive(Debug, Clone, Default)]
4463#[non_exhaustive]
4464pub struct Receipts {
4465    /// Fully-read marker (room account data).
4466    pub fully_read: Option<OwnedEventId>,
4467    /// Read receipt (public ephemeral room event).
4468    pub public_read_receipt: Option<OwnedEventId>,
4469    /// Read receipt (private ephemeral room event).
4470    pub private_read_receipt: Option<OwnedEventId>,
4471}
4472
4473impl Receipts {
4474    /// Create an empty `Receipts`.
4475    pub fn new() -> Self {
4476        Self::default()
4477    }
4478
4479    /// Set the last event the user has read.
4480    ///
4481    /// It means that the user has read all the events before this event.
4482    ///
4483    /// This is a private marker only visible by the user.
4484    ///
4485    /// Note that this is technically not a receipt as it is persisted in the
4486    /// room account data.
4487    pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4488        self.fully_read = event_id.into();
4489        self
4490    }
4491
4492    /// Set the last event presented to the user and forward it to the other
4493    /// users in the room.
4494    ///
4495    /// This is used to reset the unread messages/notification count and
4496    /// advertise to other users the last event that the user has likely seen.
4497    pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4498        self.public_read_receipt = event_id.into();
4499        self
4500    }
4501
4502    /// Set the last event presented to the user and don't forward it.
4503    ///
4504    /// This is used to reset the unread messages/notification count.
4505    pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4506        self.private_read_receipt = event_id.into();
4507        self
4508    }
4509
4510    /// Whether this `Receipts` is empty.
4511    pub fn is_empty(&self) -> bool {
4512        self.fully_read.is_none()
4513            && self.public_read_receipt.is_none()
4514            && self.private_read_receipt.is_none()
4515    }
4516}
4517
4518/// [Parent space](https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships)
4519/// listed by a room, possibly validated by checking the space's state.
4520#[derive(Debug)]
4521pub enum ParentSpace {
4522    /// The room recognizes the given room as its parent, and the parent
4523    /// recognizes it as its child.
4524    Reciprocal(Room),
4525    /// The room recognizes the given room as its parent, but the parent does
4526    /// not recognizes it as its child. However, the author of the
4527    /// `m.space.parent` event in the room has a sufficient power level in the
4528    /// parent to create the child event.
4529    WithPowerlevel(Room),
4530    /// The room recognizes the given room as its parent, but the parent does
4531    /// not recognizes it as its child.
4532    Illegitimate(Room),
4533    /// The room recognizes the given id as its parent room, but we cannot check
4534    /// whether the parent recognizes it as its child.
4535    Unverifiable(OwnedRoomId),
4536}
4537
4538/// The score to rate an inappropriate content.
4539///
4540/// Must be a value between `0`, inoffensive, and `-100`, very offensive.
4541#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4542pub struct ReportedContentScore(i8);
4543
4544impl ReportedContentScore {
4545    /// The smallest value that can be represented by this type.
4546    ///
4547    /// This is for very offensive content.
4548    pub const MIN: Self = Self(-100);
4549
4550    /// The largest value that can be represented by this type.
4551    ///
4552    /// This is for inoffensive content.
4553    pub const MAX: Self = Self(0);
4554
4555    /// Try to create a `ReportedContentScore` from the provided `i8`.
4556    ///
4557    /// Returns `None` if it is smaller than [`ReportedContentScore::MIN`] or
4558    /// larger than [`ReportedContentScore::MAX`] .
4559    ///
4560    /// This is the same as the `TryFrom<i8>` implementation for
4561    /// `ReportedContentScore`, except that it returns an `Option` instead
4562    /// of a `Result`.
4563    pub fn new(value: i8) -> Option<Self> {
4564        value.try_into().ok()
4565    }
4566
4567    /// Create a `ReportedContentScore` from the provided `i8` clamped to the
4568    /// acceptable interval.
4569    ///
4570    /// The given value gets clamped into the closed interval between
4571    /// [`ReportedContentScore::MIN`] and [`ReportedContentScore::MAX`].
4572    pub fn new_saturating(value: i8) -> Self {
4573        if value > Self::MAX {
4574            Self::MAX
4575        } else if value < Self::MIN {
4576            Self::MIN
4577        } else {
4578            Self(value)
4579        }
4580    }
4581
4582    /// The value of this score.
4583    pub fn value(&self) -> i8 {
4584        self.0
4585    }
4586}
4587
4588impl PartialEq<i8> for ReportedContentScore {
4589    fn eq(&self, other: &i8) -> bool {
4590        self.0.eq(other)
4591    }
4592}
4593
4594impl PartialEq<ReportedContentScore> for i8 {
4595    fn eq(&self, other: &ReportedContentScore) -> bool {
4596        self.eq(&other.0)
4597    }
4598}
4599
4600impl PartialOrd<i8> for ReportedContentScore {
4601    fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4602        self.0.partial_cmp(other)
4603    }
4604}
4605
4606impl PartialOrd<ReportedContentScore> for i8 {
4607    fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4608        self.partial_cmp(&other.0)
4609    }
4610}
4611
4612impl From<ReportedContentScore> for Int {
4613    fn from(value: ReportedContentScore) -> Self {
4614        value.0.into()
4615    }
4616}
4617
4618impl TryFrom<i8> for ReportedContentScore {
4619    type Error = TryFromReportedContentScoreError;
4620
4621    fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4622        if value > Self::MAX || value < Self::MIN {
4623            Err(TryFromReportedContentScoreError(()))
4624        } else {
4625            Ok(Self(value))
4626        }
4627    }
4628}
4629
4630impl TryFrom<i16> for ReportedContentScore {
4631    type Error = TryFromReportedContentScoreError;
4632
4633    fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4634        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4635        value.try_into()
4636    }
4637}
4638
4639impl TryFrom<i32> for ReportedContentScore {
4640    type Error = TryFromReportedContentScoreError;
4641
4642    fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4643        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4644        value.try_into()
4645    }
4646}
4647
4648impl TryFrom<i64> for ReportedContentScore {
4649    type Error = TryFromReportedContentScoreError;
4650
4651    fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4652        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4653        value.try_into()
4654    }
4655}
4656
4657impl TryFrom<Int> for ReportedContentScore {
4658    type Error = TryFromReportedContentScoreError;
4659
4660    fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4661        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4662        value.try_into()
4663    }
4664}
4665
4666trait EventSource {
4667    fn get_event(
4668        &self,
4669        event_id: &EventId,
4670    ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4671}
4672
4673impl EventSource for &Room {
4674    async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4675        self.load_or_fetch_event(event_id, None).await
4676    }
4677}
4678
4679/// The error type returned when a checked `ReportedContentScore` conversion
4680/// fails.
4681#[derive(Debug, Clone, Error)]
4682#[error("out of range conversion attempted")]
4683pub struct TryFromReportedContentScoreError(());
4684
4685/// Contains the current user's room member info and the optional room member
4686/// info of the sender of the `m.room.member` event that this info represents.
4687#[derive(Debug)]
4688pub struct RoomMemberWithSenderInfo {
4689    /// The actual room member.
4690    pub room_member: RoomMember,
4691    /// The info of the sender of the event `room_member` is based on, if
4692    /// available.
4693    pub sender_info: Option<RoomMember>,
4694}
4695
4696#[cfg(all(test, not(target_family = "wasm")))]
4697mod tests {
4698    use std::collections::BTreeMap;
4699
4700    use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4701    use matrix_sdk_test::{
4702        JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
4703        event_factory::EventFactory, test_json,
4704    };
4705    use ruma::{
4706        RoomVersionId, event_id,
4707        events::{relation::RelationType, room::member::MembershipState},
4708        int, owned_event_id, room_id, user_id,
4709    };
4710    use wiremock::{
4711        Mock, MockServer, ResponseTemplate,
4712        matchers::{header, method, path_regex},
4713    };
4714
4715    use super::ReportedContentScore;
4716    use crate::{
4717        Client,
4718        config::RequestConfig,
4719        room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4720        test_utils::{
4721            client::mock_matrix_session,
4722            logged_in_client,
4723            mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4724        },
4725    };
4726
4727    #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4728    #[async_test]
4729    async fn test_cache_invalidation_while_encrypt() {
4730        use matrix_sdk_base::store::RoomLoadSettings;
4731        use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4732
4733        let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4734        let session = mock_matrix_session();
4735
4736        let client = Client::builder()
4737            .homeserver_url("http://localhost:1234")
4738            .request_config(RequestConfig::new().disable_retry())
4739            .sqlite_store(&sqlite_path, None)
4740            .build()
4741            .await
4742            .unwrap();
4743        client
4744            .matrix_auth()
4745            .restore_session(session.clone(), RoomLoadSettings::default())
4746            .await
4747            .unwrap();
4748
4749        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4750
4751        // Mock receiving an event to create an internal room.
4752        let server = MockServer::start().await;
4753        {
4754            Mock::given(method("GET"))
4755                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4756                .and(header("authorization", "Bearer 1234"))
4757                .respond_with(
4758                    ResponseTemplate::new(200)
4759                        .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4760                )
4761                .mount(&server)
4762                .await;
4763            let response = SyncResponseBuilder::default()
4764                .add_joined_room(
4765                    JoinedRoomBuilder::default()
4766                        .add_state_event(StateTestEvent::Member)
4767                        .add_state_event(StateTestEvent::PowerLevels)
4768                        .add_state_event(StateTestEvent::Encryption),
4769                )
4770                .build_sync_response();
4771            client.base_client().receive_sync_response(response).await.unwrap();
4772        }
4773
4774        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4775
4776        // Step 1, preshare the room keys.
4777        room.preshare_room_key().await.unwrap();
4778
4779        // Step 2, force lock invalidation by pretending another client obtained the
4780        // lock.
4781        {
4782            let client = Client::builder()
4783                .homeserver_url("http://localhost:1234")
4784                .request_config(RequestConfig::new().disable_retry())
4785                .sqlite_store(&sqlite_path, None)
4786                .build()
4787                .await
4788                .unwrap();
4789            client
4790                .matrix_auth()
4791                .restore_session(session.clone(), RoomLoadSettings::default())
4792                .await
4793                .unwrap();
4794            client
4795                .encryption()
4796                .enable_cross_process_store_lock("client2".to_owned())
4797                .await
4798                .unwrap();
4799
4800            let guard = client.encryption().spin_lock_store(None).await.unwrap();
4801            assert!(guard.is_some());
4802        }
4803
4804        // Step 3, take the crypto-store lock.
4805        let guard = client.encryption().spin_lock_store(None).await.unwrap();
4806        assert!(guard.is_some());
4807
4808        // Step 4, try to encrypt a message.
4809        let olm = client.olm_machine().await;
4810        let olm = olm.as_ref().expect("Olm machine wasn't started");
4811
4812        // Now pretend we're encrypting an event; the olm machine shouldn't rely on
4813        // caching the outgoing session before.
4814        let _encrypted_content = olm
4815            .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4816            .await
4817            .unwrap();
4818    }
4819
4820    #[test]
4821    fn reported_content_score() {
4822        // i8
4823        let score = ReportedContentScore::new(0).unwrap();
4824        assert_eq!(score.value(), 0);
4825        let score = ReportedContentScore::new(-50).unwrap();
4826        assert_eq!(score.value(), -50);
4827        let score = ReportedContentScore::new(-100).unwrap();
4828        assert_eq!(score.value(), -100);
4829        assert_eq!(ReportedContentScore::new(10), None);
4830        assert_eq!(ReportedContentScore::new(-110), None);
4831
4832        let score = ReportedContentScore::new_saturating(0);
4833        assert_eq!(score.value(), 0);
4834        let score = ReportedContentScore::new_saturating(-50);
4835        assert_eq!(score.value(), -50);
4836        let score = ReportedContentScore::new_saturating(-100);
4837        assert_eq!(score.value(), -100);
4838        let score = ReportedContentScore::new_saturating(10);
4839        assert_eq!(score, ReportedContentScore::MAX);
4840        let score = ReportedContentScore::new_saturating(-110);
4841        assert_eq!(score, ReportedContentScore::MIN);
4842
4843        // i16
4844        let score = ReportedContentScore::try_from(0i16).unwrap();
4845        assert_eq!(score.value(), 0);
4846        let score = ReportedContentScore::try_from(-100i16).unwrap();
4847        assert_eq!(score.value(), -100);
4848        ReportedContentScore::try_from(10i16).unwrap_err();
4849        ReportedContentScore::try_from(-110i16).unwrap_err();
4850
4851        // i32
4852        let score = ReportedContentScore::try_from(0i32).unwrap();
4853        assert_eq!(score.value(), 0);
4854        let score = ReportedContentScore::try_from(-100i32).unwrap();
4855        assert_eq!(score.value(), -100);
4856        ReportedContentScore::try_from(10i32).unwrap_err();
4857        ReportedContentScore::try_from(-110i32).unwrap_err();
4858
4859        // i64
4860        let score = ReportedContentScore::try_from(0i64).unwrap();
4861        assert_eq!(score.value(), 0);
4862        let score = ReportedContentScore::try_from(-100i64).unwrap();
4863        assert_eq!(score.value(), -100);
4864        ReportedContentScore::try_from(10i64).unwrap_err();
4865        ReportedContentScore::try_from(-110i64).unwrap_err();
4866
4867        // Int
4868        let score = ReportedContentScore::try_from(int!(0)).unwrap();
4869        assert_eq!(score.value(), 0);
4870        let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4871        assert_eq!(score.value(), -100);
4872        ReportedContentScore::try_from(int!(10)).unwrap_err();
4873        ReportedContentScore::try_from(int!(-110)).unwrap_err();
4874    }
4875
4876    #[async_test]
4877    async fn test_composer_draft() {
4878        use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4879
4880        let client = logged_in_client(None).await;
4881
4882        let response = SyncResponseBuilder::default()
4883            .add_joined_room(JoinedRoomBuilder::default())
4884            .build_sync_response();
4885        client.base_client().receive_sync_response(response).await.unwrap();
4886        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4887
4888        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4889
4890        // Save 2 drafts, one for the room and one for a thread.
4891
4892        let draft = ComposerDraft {
4893            plain_text: "Hello, world!".to_owned(),
4894            html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4895            draft_type: ComposerDraftType::NewMessage,
4896            attachments: vec![DraftAttachment {
4897                filename: "cat.txt".to_owned(),
4898                content: matrix_sdk_base::DraftAttachmentContent::File {
4899                    data: b"meow".to_vec(),
4900                    mimetype: Some("text/plain".to_owned()),
4901                    size: Some(5),
4902                },
4903            }],
4904        };
4905
4906        room.save_composer_draft(draft.clone(), None).await.unwrap();
4907
4908        let thread_root = owned_event_id!("$thread_root:b.c");
4909        let thread_draft = ComposerDraft {
4910            plain_text: "Hello, thread!".to_owned(),
4911            html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4912            draft_type: ComposerDraftType::NewMessage,
4913            attachments: vec![DraftAttachment {
4914                filename: "dog.txt".to_owned(),
4915                content: matrix_sdk_base::DraftAttachmentContent::File {
4916                    data: b"wuv".to_vec(),
4917                    mimetype: Some("text/plain".to_owned()),
4918                    size: Some(4),
4919                },
4920            }],
4921        };
4922
4923        room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4924
4925        // Check that the room draft was saved correctly
4926        assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4927
4928        // Check that the thread draft was saved correctly
4929        assert_eq!(
4930            room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4931            Some(thread_draft.clone())
4932        );
4933
4934        // Clear the room draft
4935        room.clear_composer_draft(None).await.unwrap();
4936        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4937
4938        // Check that the thread one is still there
4939        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4940
4941        // Clear the thread draft as well
4942        room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4943        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4944    }
4945
4946    #[async_test]
4947    async fn test_mark_join_requests_as_seen() {
4948        let server = MatrixMockServer::new().await;
4949        let client = server.client_builder().build().await;
4950        let event_id = event_id!("$a:b.c");
4951        let room_id = room_id!("!a:b.c");
4952        let user_id = user_id!("@alice:b.c");
4953
4954        let f = EventFactory::new().room(room_id);
4955        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4956            f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
4957        ]);
4958        let room = server.sync_room(&client, joined_room_builder).await;
4959
4960        // When loading the initial seen ids, there are none
4961        let seen_ids =
4962            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4963        assert!(seen_ids.is_empty());
4964
4965        // We mark a random event id as seen
4966        room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4967            .await
4968            .expect("Couldn't mark join request as seen");
4969
4970        // Then we can check it was successfully marked as seen
4971        let seen_ids =
4972            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4973        assert_eq!(seen_ids.len(), 1);
4974        assert_eq!(
4975            seen_ids.into_iter().next().expect("No next value"),
4976            (event_id.to_owned(), user_id.to_owned())
4977        )
4978    }
4979
4980    #[async_test]
4981    async fn test_own_room_membership_with_no_own_member_event() {
4982        let server = MatrixMockServer::new().await;
4983        let client = server.client_builder().build().await;
4984        let room_id = room_id!("!a:b.c");
4985
4986        let room = server.sync_joined_room(&client, room_id).await;
4987
4988        // Since there is no member event for the own user, the method fails.
4989        // This should never happen in an actual room.
4990        let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4991        assert!(error.is_some());
4992    }
4993
4994    #[async_test]
4995    async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4996        let server = MatrixMockServer::new().await;
4997        let client = server.client_builder().build().await;
4998        let room_id = room_id!("!a:b.c");
4999        let user_id = user_id!("@example:localhost");
5000
5001        let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
5002        let joined_room_builder =
5003            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5004        let room = server.sync_room(&client, joined_room_builder).await;
5005
5006        // When we load the membership details
5007        let ret = room
5008            .member_with_sender_info(client.user_id().unwrap())
5009            .await
5010            .expect("Room member info should be available");
5011
5012        // We get the member info for the current user
5013        assert_eq!(ret.room_member.event().user_id(), user_id);
5014
5015        // But there is no info for the sender
5016        assert!(ret.sender_info.is_none());
5017    }
5018
5019    #[async_test]
5020    async fn test_own_room_membership_with_own_member_event_and_own_sender() {
5021        let server = MatrixMockServer::new().await;
5022        let client = server.client_builder().build().await;
5023        let room_id = room_id!("!a:b.c");
5024        let user_id = user_id!("@example:localhost");
5025
5026        let f = EventFactory::new().room(room_id).sender(user_id);
5027        let joined_room_builder =
5028            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5029        let room = server.sync_room(&client, joined_room_builder).await;
5030
5031        // When we load the membership details
5032        let ret = room
5033            .member_with_sender_info(client.user_id().unwrap())
5034            .await
5035            .expect("Room member info should be available");
5036
5037        // We get the current user's member info
5038        assert_eq!(ret.room_member.event().user_id(), user_id);
5039
5040        // And the sender has the same info, since it's also the current user
5041        assert!(ret.sender_info.is_some());
5042        assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5043    }
5044
5045    #[async_test]
5046    async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5047        let server = MatrixMockServer::new().await;
5048        let client = server.client_builder().build().await;
5049        let room_id = room_id!("!a:b.c");
5050        let user_id = user_id!("@example:localhost");
5051        let sender_id = user_id!("@alice:b.c");
5052
5053        let f = EventFactory::new().room(room_id).sender(sender_id);
5054        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5055            f.member(user_id).into(),
5056            // The sender info comes from the sync
5057            f.member(sender_id).into(),
5058        ]);
5059        let room = server.sync_room(&client, joined_room_builder).await;
5060
5061        // When we load the membership details
5062        let ret = room
5063            .member_with_sender_info(client.user_id().unwrap())
5064            .await
5065            .expect("Room member info should be available");
5066
5067        // We get the current user's member info
5068        assert_eq!(ret.room_member.event().user_id(), user_id);
5069
5070        // And also the sender info from the events received in the sync
5071        assert!(ret.sender_info.is_some());
5072        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5073    }
5074
5075    #[async_test]
5076    async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5077        let server = MatrixMockServer::new().await;
5078        let client = server.client_builder().build().await;
5079        let room_id = room_id!("!a:b.c");
5080        let user_id = user_id!("@example:localhost");
5081        let sender_id = user_id!("@alice:b.c");
5082
5083        let f = EventFactory::new().room(room_id).sender(sender_id);
5084        let joined_room_builder =
5085            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5086        let room = server.sync_room(&client, joined_room_builder).await;
5087
5088        // We'll receive the member info through the /members endpoint
5089        server
5090            .mock_get_members()
5091            .ok(vec![f.member(sender_id).into_raw()])
5092            .mock_once()
5093            .mount()
5094            .await;
5095
5096        // We get the current user's member info
5097        let ret = room
5098            .member_with_sender_info(client.user_id().unwrap())
5099            .await
5100            .expect("Room member info should be available");
5101
5102        // We get the current user's member info
5103        assert_eq!(ret.room_member.event().user_id(), user_id);
5104
5105        // And also the sender info from the /members endpoint
5106        assert!(ret.sender_info.is_some());
5107        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5108    }
5109
5110    #[async_test]
5111    async fn test_list_threads() {
5112        let server = MatrixMockServer::new().await;
5113        let client = server.client_builder().build().await;
5114
5115        let room_id = room_id!("!a:b.c");
5116        let sender_id = user_id!("@alice:b.c");
5117        let f = EventFactory::new().room(room_id).sender(sender_id);
5118
5119        let eid1 = event_id!("$1");
5120        let eid2 = event_id!("$2");
5121        let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5122        let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5123
5124        server
5125            .mock_room_threads()
5126            .ok(batch1.clone(), Some("prev_batch".to_owned()))
5127            .mock_once()
5128            .mount()
5129            .await;
5130        server
5131            .mock_room_threads()
5132            .match_from("prev_batch")
5133            .ok(batch2, None)
5134            .mock_once()
5135            .mount()
5136            .await;
5137
5138        let room = server.sync_joined_room(&client, room_id).await;
5139        let result =
5140            room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5141        assert_eq!(result.chunk.len(), 1);
5142        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5143        assert!(result.prev_batch_token.is_some());
5144
5145        let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5146        let result = room.list_threads(opts).await.expect("Failed to list threads");
5147        assert_eq!(result.chunk.len(), 1);
5148        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5149        assert!(result.prev_batch_token.is_none());
5150    }
5151
5152    #[async_test]
5153    async fn test_relations() {
5154        let server = MatrixMockServer::new().await;
5155        let client = server.client_builder().build().await;
5156
5157        let room_id = room_id!("!a:b.c");
5158        let sender_id = user_id!("@alice:b.c");
5159        let f = EventFactory::new().room(room_id).sender(sender_id);
5160
5161        let target_event_id = owned_event_id!("$target");
5162        let eid1 = event_id!("$1");
5163        let eid2 = event_id!("$2");
5164        let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5165        let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5166
5167        server
5168            .mock_room_relations()
5169            .match_target_event(target_event_id.clone())
5170            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5171            .mock_once()
5172            .mount()
5173            .await;
5174
5175        server
5176            .mock_room_relations()
5177            .match_target_event(target_event_id.clone())
5178            .match_from("next_batch")
5179            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5180            .mock_once()
5181            .mount()
5182            .await;
5183
5184        let room = server.sync_joined_room(&client, room_id).await;
5185
5186        // Main endpoint: no relation type filtered out.
5187        let mut opts = RelationsOptions {
5188            include_relations: IncludeRelations::AllRelations,
5189            ..Default::default()
5190        };
5191        let result = room
5192            .relations(target_event_id.clone(), opts.clone())
5193            .await
5194            .expect("Failed to list relations the first time");
5195        assert_eq!(result.chunk.len(), 1);
5196        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5197        assert!(result.prev_batch_token.is_none());
5198        assert!(result.next_batch_token.is_some());
5199        assert!(result.recursion_depth.is_none());
5200
5201        opts.from = result.next_batch_token;
5202        let result = room
5203            .relations(target_event_id, opts)
5204            .await
5205            .expect("Failed to list relations the second time");
5206        assert_eq!(result.chunk.len(), 1);
5207        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5208        assert!(result.prev_batch_token.is_none());
5209        assert!(result.next_batch_token.is_none());
5210        assert!(result.recursion_depth.is_none());
5211    }
5212
5213    #[async_test]
5214    async fn test_relations_with_reltype() {
5215        let server = MatrixMockServer::new().await;
5216        let client = server.client_builder().build().await;
5217
5218        let room_id = room_id!("!a:b.c");
5219        let sender_id = user_id!("@alice:b.c");
5220        let f = EventFactory::new().room(room_id).sender(sender_id);
5221
5222        let target_event_id = owned_event_id!("$target");
5223        let eid1 = event_id!("$1");
5224        let eid2 = event_id!("$2");
5225        let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5226        let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5227
5228        server
5229            .mock_room_relations()
5230            .match_target_event(target_event_id.clone())
5231            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5232            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5233            .mock_once()
5234            .mount()
5235            .await;
5236
5237        server
5238            .mock_room_relations()
5239            .match_target_event(target_event_id.clone())
5240            .match_from("next_batch")
5241            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5242            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5243            .mock_once()
5244            .mount()
5245            .await;
5246
5247        let room = server.sync_joined_room(&client, room_id).await;
5248
5249        // Reltype-filtered endpoint, for threads \o/
5250        let mut opts = RelationsOptions {
5251            include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5252            ..Default::default()
5253        };
5254        let result = room
5255            .relations(target_event_id.clone(), opts.clone())
5256            .await
5257            .expect("Failed to list relations the first time");
5258        assert_eq!(result.chunk.len(), 1);
5259        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5260        assert!(result.prev_batch_token.is_none());
5261        assert!(result.next_batch_token.is_some());
5262        assert!(result.recursion_depth.is_none());
5263
5264        opts.from = result.next_batch_token;
5265        let result = room
5266            .relations(target_event_id, opts)
5267            .await
5268            .expect("Failed to list relations the second time");
5269        assert_eq!(result.chunk.len(), 1);
5270        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5271        assert!(result.prev_batch_token.is_none());
5272        assert!(result.next_batch_token.is_none());
5273        assert!(result.recursion_depth.is_none());
5274    }
5275
5276    #[async_test]
5277    async fn test_power_levels_computation() {
5278        let server = MatrixMockServer::new().await;
5279        let client = server.client_builder().build().await;
5280
5281        let room_id = room_id!("!a:b.c");
5282        let sender_id = client.user_id().expect("No session id");
5283        let f = EventFactory::new().room(room_id).sender(sender_id);
5284        let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5285
5286        // Computing the power levels will need these 3 state events:
5287        let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5288        let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5289        let room_member_event = f.member(sender_id).into();
5290
5291        // With only the room member event
5292        let room = server
5293            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5294            .await;
5295        let ctx = room
5296            .push_condition_room_ctx()
5297            .await
5298            .expect("Failed to get push condition context")
5299            .expect("Could not get push condition context");
5300
5301        // The internal power levels couldn't be computed
5302        assert!(ctx.power_levels.is_none());
5303
5304        // Adding the room creation event
5305        let room = server
5306            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5307            .await;
5308        let ctx = room
5309            .push_condition_room_ctx()
5310            .await
5311            .expect("Failed to get push condition context")
5312            .expect("Could not get push condition context");
5313
5314        // The internal power levels still couldn't be computed
5315        assert!(ctx.power_levels.is_none());
5316
5317        // With the room member, room creation and the power levels events
5318        let room = server
5319            .sync_room(
5320                &client,
5321                JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5322            )
5323            .await;
5324        let ctx = room
5325            .push_condition_room_ctx()
5326            .await
5327            .expect("Failed to get push condition context")
5328            .expect("Could not get push condition context");
5329
5330        // The internal power levels can finally be computed
5331        assert!(ctx.power_levels.is_some());
5332    }
5333}