matrix_sdk/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level room API
16
17use std::{
18    borrow::Borrow,
19    collections::{BTreeMap, HashMap},
20    future::Future,
21    ops::Deref,
22    sync::Arc,
23    time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30    future::join_all, stream as futures_stream, stream::FuturesUnordered, StreamExt,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{IdentityStatusChange, RoomIdentityProvider, UserIdentity};
39pub use matrix_sdk_base::store::ThreadSubscription;
40#[cfg(feature = "e2e-encryption")]
41use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
42use matrix_sdk_base::{
43    deserialized_responses::{
44        RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
45    },
46    event_cache::store::media::IgnoreMediaRetentionPolicy,
47    media::MediaThumbnailSettings,
48    store::StateStoreExt,
49    ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
50    StateChanges, StateStoreDataKey, StateStoreDataValue,
51};
52#[cfg(feature = "e2e-encryption")]
53use matrix_sdk_common::BoxFuture;
54use matrix_sdk_common::{
55    deserialized_responses::TimelineEvent,
56    executor::{spawn, JoinHandle},
57    timeout::timeout,
58};
59#[cfg(feature = "experimental-search")]
60#[cfg(doc)]
61use matrix_sdk_search::index::RoomIndex;
62use mime::Mime;
63use reply::Reply;
64#[cfg(feature = "unstable-msc4274")]
65use ruma::events::room::message::GalleryItemType;
66#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
67use ruma::events::AnySyncMessageLikeEvent;
68#[cfg(feature = "experimental-encrypted-state-events")]
69use ruma::events::AnySyncStateEvent;
70#[cfg(feature = "e2e-encryption")]
71use ruma::events::{
72    room::encrypted::OriginalSyncRoomEncryptedEvent, AnySyncTimelineEvent, SyncMessageLikeEvent,
73};
74use ruma::{
75    api::client::{
76        config::{set_global_account_data, set_room_account_data},
77        context,
78        error::ErrorKind,
79        filter::LazyLoadOptions,
80        membership::{
81            ban_user, forget_room, get_member_events,
82            invite_user::{self, v3::InvitationRecipient},
83            kick_user, leave_room, unban_user, Invite3pid,
84        },
85        message::send_message_event,
86        read_marker::set_read_marker,
87        receipt::create_receipt,
88        redact::redact_event,
89        room::{get_room_event, report_content, report_room},
90        state::{get_state_event_for_key, send_state_event},
91        tag::{create_tag, delete_tag},
92        threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
93        typing::create_typing_event::{self, v3::Typing},
94    },
95    assign,
96    events::{
97        beacon::BeaconEventContent,
98        beacon_info::BeaconInfoEventContent,
99        direct::DirectEventContent,
100        marked_unread::MarkedUnreadEventContent,
101        receipt::{Receipt, ReceiptThread, ReceiptType},
102        room::{
103            avatar::{self, RoomAvatarEventContent},
104            encryption::RoomEncryptionEventContent,
105            history_visibility::HistoryVisibility,
106            member::{MembershipChange, SyncRoomMemberEvent},
107            message::{
108                AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
109                FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent,
110                UnstableAudioDetailsContentBlock, UnstableVoiceContentBlock, VideoInfo,
111                VideoMessageEventContent,
112            },
113            name::RoomNameEventContent,
114            pinned_events::RoomPinnedEventsEventContent,
115            power_levels::{
116                RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
117            },
118            server_acl::RoomServerAclEventContent,
119            topic::RoomTopicEventContent,
120            ImageInfo, MediaSource, ThumbnailInfo,
121        },
122        space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
123        tag::{TagInfo, TagName},
124        typing::SyncTypingEvent,
125        AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
126        Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
127        RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
128        RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
129        StaticStateEventContent, SyncStateEvent,
130    },
131    int,
132    push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
133    serde::Raw,
134    time::Instant,
135    EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
136    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
137};
138#[cfg(feature = "experimental-encrypted-state-events")]
139use ruma::{
140    events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
141    serde::JsonCastable,
142};
143use serde::de::DeserializeOwned;
144use thiserror::Error;
145use tokio::{join, sync::broadcast};
146use tracing::{debug, error, info, instrument, trace, warn};
147
148use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
149pub use self::{
150    member::{RoomMember, RoomMemberRole},
151    messages::{
152        EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
153        Relations, RelationsOptions, ThreadRoots,
154    },
155};
156#[cfg(doc)]
157use crate::event_cache::EventCache;
158#[cfg(feature = "experimental-encrypted-state-events")]
159use crate::room::futures::{SendRawStateEvent, SendStateEvent};
160use crate::{
161    attachment::{AttachmentConfig, AttachmentInfo},
162    client::WeakClient,
163    config::RequestConfig,
164    error::{BeaconError, WrongRoomState},
165    event_cache::{self, EventCacheDropHandles, RoomEventCache},
166    event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
167    live_location_share::ObservableLiveLocation,
168    media::{MediaFormat, MediaRequestParameters},
169    notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
170    room::{
171        knock_requests::{KnockRequest, KnockRequestMemberInfo},
172        power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
173        privacy_settings::RoomPrivacySettings,
174    },
175    sync::RoomUpdate,
176    utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
177    BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
178};
179#[cfg(feature = "e2e-encryption")]
180use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
181
182pub mod edit;
183pub mod futures;
184pub mod identity_status_changes;
185/// Contains code related to requests to join a room.
186pub mod knock_requests;
187mod member;
188mod messages;
189pub mod power_levels;
190pub mod reply;
191
192/// Contains all the functionality for modifying the privacy settings in a room.
193pub mod privacy_settings;
194
195#[cfg(feature = "e2e-encryption")]
196pub(crate) mod shared_room_history;
197
198/// A struct containing methods that are common for Joined, Invited and Left
199/// Rooms
200#[derive(Debug, Clone)]
201pub struct Room {
202    inner: BaseRoom,
203    pub(crate) client: Client,
204}
205
206impl Deref for Room {
207    type Target = BaseRoom;
208
209    fn deref(&self) -> &Self::Target {
210        &self.inner
211    }
212}
213
214const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
215const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
216
217/// Context allowing to compute the push actions for a given event.
218#[derive(Debug)]
219pub struct PushContext {
220    /// The Ruma context used to compute the push actions.
221    push_condition_room_ctx: PushConditionRoomCtx,
222
223    /// Push rules for this room, based on the push rules state event, or the
224    /// global server default as defined by [`Ruleset::server_default`].
225    push_rules: Ruleset,
226}
227
228impl PushContext {
229    /// Create a new [`PushContext`] from its inner components.
230    pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
231        Self { push_condition_room_ctx, push_rules }
232    }
233
234    /// Compute the push rules for a given event.
235    pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
236        self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
237    }
238
239    /// Compute the push rules for a given event, with extra logging to help
240    /// debugging.
241    #[doc(hidden)]
242    #[instrument(skip_all)]
243    pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
244        let rules = self
245            .push_rules
246            .iter()
247            .filter_map(|r| {
248                if !r.enabled() {
249                    return None;
250                }
251
252                let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
253
254                let conditions = match r {
255                    AnyPushRuleRef::Override(r) => {
256                        format!("{:?}", r.conditions)
257                    }
258                    AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
259                    AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
260                    AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
261                    AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
262                    _ => "<unknown push rule kind>".to_owned(),
263                };
264
265                Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
266            })
267            .collect::<Vec<_>>()
268            .join("\n");
269        trace!("rules:\n\n{rules}\n\n");
270
271        let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
272
273        if let Some(found) = found {
274            trace!("rule {} matched", found.rule_id());
275            found.actions().to_owned()
276        } else {
277            trace!("no match");
278            Vec::new()
279        }
280    }
281}
282
283macro_rules! make_media_type {
284    ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $formatted_caption: ident, $info: ident, $thumbnail: ident) => {{
285        // If caption is set, use it as body, and filename as the file name; otherwise,
286        // body is the filename, and the filename is not set.
287        // https://github.com/matrix-org/matrix-spec-proposals/blob/main/proposals/2530-body-as-caption.md
288        let (body, filename) = match $caption {
289            Some(caption) => (caption, Some($filename)),
290            None => ($filename, None),
291        };
292
293        let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
294
295        match $content_type.type_() {
296            mime::IMAGE => {
297                let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
298                    mimetype: Some($content_type.as_ref().to_owned()),
299                    thumbnail_source,
300                    thumbnail_info
301                });
302                let content = assign!(ImageMessageEventContent::new(body, $source), {
303                    info: Some(Box::new(info)),
304                    formatted: $formatted_caption,
305                    filename
306                });
307                <$t>::Image(content)
308            }
309
310            mime::AUDIO => {
311                let mut content = assign!(AudioMessageEventContent::new(body, $source), {
312                    formatted: $formatted_caption,
313                    filename
314                });
315
316                if let Some(AttachmentInfo::Voice { audio_info, waveform: Some(waveform_vec) }) =
317                    &$info
318                {
319                    if let Some(duration) = audio_info.duration {
320                        let waveform = waveform_vec.iter().map(|v| (*v).into()).collect();
321                        content.audio =
322                            Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
323                    }
324                    content.voice = Some(UnstableVoiceContentBlock::new());
325                }
326
327                let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
328                audio_info.mimetype = Some($content_type.as_ref().to_owned());
329                let content = content.info(Box::new(audio_info));
330
331                <$t>::Audio(content)
332            }
333
334            mime::VIDEO => {
335                let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
336                    mimetype: Some($content_type.as_ref().to_owned()),
337                    thumbnail_source,
338                    thumbnail_info
339                });
340                let content = assign!(VideoMessageEventContent::new(body, $source), {
341                    info: Some(Box::new(info)),
342                    formatted: $formatted_caption,
343                    filename
344                });
345                <$t>::Video(content)
346            }
347
348            _ => {
349                let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
350                    mimetype: Some($content_type.as_ref().to_owned()),
351                    thumbnail_source,
352                    thumbnail_info
353                });
354                let content = assign!(FileMessageEventContent::new(body, $source), {
355                    info: Some(Box::new(info)),
356                    formatted: $formatted_caption,
357                    filename,
358                });
359                <$t>::File(content)
360            }
361        }
362    }};
363}
364
365impl Room {
366    /// Create a new `Room`
367    ///
368    /// # Arguments
369    /// * `client` - The client used to make requests.
370    ///
371    /// * `room` - The underlying room.
372    pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
373        Self { inner: room, client }
374    }
375
376    /// Leave this room.
377    /// If the room was in [`RoomState::Invited`] state, it'll also be forgotten
378    /// automatically.
379    ///
380    /// Only invited and joined rooms can be left.
381    #[doc(alias = "reject_invitation")]
382    #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
383    async fn leave_impl(&self) -> (Result<()>, &Room) {
384        let state = self.state();
385        if state == RoomState::Left {
386            return (
387                Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
388                    "Joined or Invited",
389                    state,
390                )))),
391                self,
392            );
393        }
394
395        // If the room was in Invited state we should also forget it when declining the
396        // invite.
397        let should_forget = matches!(self.state(), RoomState::Invited);
398
399        let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
400        let response = self.client.send(request).await;
401
402        // The server can return with an error that is acceptable to ignore. Let's find
403        // which one.
404        if let Err(error) = response {
405            #[allow(clippy::collapsible_match)]
406            let ignore_error = if let Some(error) = error.client_api_error_kind() {
407                match error {
408                    // The user is trying to leave a room but doesn't have permissions to do so.
409                    // Let's consider the user has left the room.
410                    ErrorKind::Forbidden { .. } => true,
411                    _ => false,
412                }
413            } else {
414                false
415            };
416
417            error!(?error, ignore_error, should_forget, "Failed to leave the room");
418
419            if !ignore_error {
420                return (Err(error.into()), self);
421            }
422        }
423
424        if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
425            return (Err(e.into()), self);
426        }
427
428        if should_forget {
429            trace!("Trying to forget the room");
430
431            if let Err(error) = self.forget().await {
432                error!(?error, "Failed to forget the room");
433            }
434        }
435
436        (Ok(()), self)
437    }
438
439    /// Leave this room and all predecessors.
440    /// If any room was in [`RoomState::Invited`] state, it'll also be forgotten
441    /// automatically.
442    ///
443    /// Only invited and joined rooms can be left.
444    /// Will return an error if the current room fails to leave but
445    /// will only warn if a predecessor fails to leave.
446    pub async fn leave(&self) -> Result<()> {
447        let mut rooms: Vec<Room> = vec![self.clone()];
448        let mut current_room = self;
449
450        while let Some(predecessor) = current_room.predecessor_room() {
451            let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
452
453            if let Some(predecessor_room) = maybe_predecessor_room {
454                rooms.push(predecessor_room.clone());
455                current_room = rooms.last().expect("Room just pushed so can't be empty");
456            } else {
457                warn!("Cannot find predecessor room");
458                break;
459            }
460        }
461
462        let batch_size = 5;
463
464        let rooms_futures: Vec<_> = rooms
465            .iter()
466            .filter_map(|room| match room.state() {
467                RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
468                    Some(room.leave_impl())
469                }
470                RoomState::Banned | RoomState::Left => None,
471            })
472            .collect();
473
474        let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
475
476        let mut maybe_this_room_failed_with: Option<Error> = None;
477
478        while let Some(result) = futures_stream.next().await {
479            if let (Err(e), room) = result {
480                if room.room_id() == self.room_id() {
481                    maybe_this_room_failed_with = Some(e);
482                } else {
483                    warn!("Failure while attempting to leave predecessor room: {e:?}");
484                }
485            }
486        }
487
488        maybe_this_room_failed_with.map_or(Ok(()), Err)
489    }
490
491    /// Join this room.
492    ///
493    /// Only invited and left rooms can be joined via this method.
494    #[doc(alias = "accept_invitation")]
495    pub async fn join(&self) -> Result<()> {
496        let prev_room_state = self.inner.state();
497
498        if prev_room_state == RoomState::Joined {
499            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
500                "Invited or Left",
501                prev_room_state,
502            ))));
503        }
504
505        self.client.join_room_by_id(self.room_id()).await?;
506
507        Ok(())
508    }
509
510    /// Get the inner client saved in this room instance.
511    ///
512    /// Returns the client this room is part of.
513    pub fn client(&self) -> Client {
514        self.client.clone()
515    }
516
517    /// Get the sync state of this room, i.e. whether it was fully synced with
518    /// the server.
519    pub fn is_synced(&self) -> bool {
520        self.inner.is_state_fully_synced()
521    }
522
523    /// Gets the avatar of this room, if set.
524    ///
525    /// Returns the avatar.
526    /// If a thumbnail is requested no guarantee on the size of the image is
527    /// given.
528    ///
529    /// # Arguments
530    ///
531    /// * `format` - The desired format of the avatar.
532    ///
533    /// # Examples
534    ///
535    /// ```no_run
536    /// # use matrix_sdk::Client;
537    /// # use matrix_sdk::ruma::room_id;
538    /// # use matrix_sdk::media::MediaFormat;
539    /// # use url::Url;
540    /// # let homeserver = Url::parse("http://example.com").unwrap();
541    /// # async {
542    /// # let user = "example";
543    /// let client = Client::new(homeserver).await.unwrap();
544    /// client.matrix_auth().login_username(user, "password").send().await.unwrap();
545    /// let room_id = room_id!("!roomid:example.com");
546    /// let room = client.get_room(&room_id).unwrap();
547    /// if let Some(avatar) = room.avatar(MediaFormat::File).await.unwrap() {
548    ///     std::fs::write("avatar.png", avatar);
549    /// }
550    /// # };
551    /// ```
552    pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
553        let Some(url) = self.avatar_url() else { return Ok(None) };
554        let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
555        Ok(Some(self.client.media().get_media_content(&request, true).await?))
556    }
557
558    /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
559    /// returns a `Messages` struct that contains a chunk of room and state
560    /// events (`RoomEvent` and `AnyStateEvent`).
561    ///
562    /// With the encryption feature, messages are decrypted if possible. If
563    /// decryption fails for an individual message, that message is returned
564    /// undecrypted.
565    ///
566    /// # Examples
567    ///
568    /// ```no_run
569    /// use matrix_sdk::{room::MessagesOptions, Client};
570    /// # use matrix_sdk::ruma::{
571    /// #     api::client::filter::RoomEventFilter,
572    /// #     room_id,
573    /// # };
574    /// # use url::Url;
575    ///
576    /// # let homeserver = Url::parse("http://example.com").unwrap();
577    /// # async {
578    /// let options =
579    ///     MessagesOptions::backward().from("t47429-4392820_219380_26003_2265");
580    ///
581    /// let mut client = Client::new(homeserver).await.unwrap();
582    /// let room = client.get_room(room_id!("!roomid:example.com")).unwrap();
583    /// assert!(room.messages(options).await.is_ok());
584    /// # };
585    /// ```
586    #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
587    pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
588        let room_id = self.inner.room_id();
589        let request = options.into_request(room_id);
590        let http_response = self.client.send(request).await?;
591
592        let push_ctx = self.push_context().await?;
593        let chunk = join_all(
594            http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
595        )
596        .await;
597
598        Ok(Messages {
599            start: http_response.start,
600            end: http_response.end,
601            chunk,
602            state: http_response.state,
603        })
604    }
605
606    /// Register a handler for events of a specific type, within this room.
607    ///
608    /// This method works the same way as [`Client::add_event_handler`], except
609    /// that the handler will only be called for events within this room. See
610    /// that method for more details on event handler functions.
611    ///
612    /// `room.add_event_handler(hdl)` is equivalent to
613    /// `client.add_room_event_handler(room_id, hdl)`. Use whichever one is more
614    /// convenient in your use case.
615    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
616    where
617        Ev: SyncEvent + DeserializeOwned + Send + 'static,
618        H: EventHandler<Ev, Ctx>,
619    {
620        self.client.add_room_event_handler(self.room_id(), handler)
621    }
622
623    /// Subscribe to all updates for this room.
624    ///
625    /// The returned receiver will receive a new message for each sync response
626    /// that contains updates for this room.
627    pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
628        self.client.subscribe_to_room_updates(self.room_id())
629    }
630
631    /// Subscribe to typing notifications for this room.
632    ///
633    /// The returned receiver will receive a new vector of user IDs for each
634    /// sync response that contains 'm.typing' event. The current user ID will
635    /// be filtered out.
636    pub fn subscribe_to_typing_notifications(
637        &self,
638    ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
639        let (sender, receiver) = broadcast::channel(16);
640        let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
641            let own_user_id = self.own_user_id().to_owned();
642            move |event: SyncTypingEvent| async move {
643                // Ignore typing notifications from own user.
644                let typing_user_ids = event
645                    .content
646                    .user_ids
647                    .into_iter()
648                    .filter(|user_id| *user_id != own_user_id)
649                    .collect();
650                // Ignore the result. It can only fail if there are no listeners.
651                let _ = sender.send(typing_user_ids);
652            }
653        });
654        let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
655        (drop_guard, receiver)
656    }
657
658    /// Subscribe to updates about users who are in "pin violation" i.e. their
659    /// identity has changed and the user has not yet acknowledged this.
660    ///
661    /// The returned receiver will receive a new vector of
662    /// [`IdentityStatusChange`] each time a /keys/query response shows a
663    /// changed identity for a member of this room, or a sync shows a change
664    /// to the membership of an affected user. (Changes to the current user are
665    /// not directly included, but some changes to the current user's identity
666    /// can trigger changes to how we see other users' identities, which
667    /// will be included.)
668    ///
669    /// The first item in the stream provides the current state of the room:
670    /// each member of the room who is not in "pinned" or "verified" state will
671    /// be included (except the current user).
672    ///
673    /// If the `changed_to` property of an [`IdentityStatusChange`] is set to
674    /// `PinViolation` then a warning should be displayed to the user. If it is
675    /// set to `Pinned` then no warning should be displayed.
676    ///
677    /// Note that if a user who is in pin violation leaves the room, a `Pinned`
678    /// update is sent, to indicate that the warning should be removed, even
679    /// though the user's identity is not necessarily pinned.
680    #[cfg(feature = "e2e-encryption")]
681    pub async fn subscribe_to_identity_status_changes(
682        &self,
683    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
684        IdentityStatusChanges::create_stream(self.clone()).await
685    }
686
687    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
688    /// decrypted if needs be.
689    ///
690    /// Only logs from the crypto crate will indicate a failure to decrypt.
691    #[cfg(not(feature = "experimental-encrypted-state-events"))]
692    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
693    async fn try_decrypt_event(
694        &self,
695        event: Raw<AnyTimelineEvent>,
696        push_ctx: Option<&PushContext>,
697    ) -> TimelineEvent {
698        #[cfg(feature = "e2e-encryption")]
699        if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
700            SyncMessageLikeEvent::Original(_),
701        ))) = event.deserialize_as::<AnySyncTimelineEvent>()
702        {
703            if let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await {
704                return event;
705            }
706        }
707
708        let mut event = TimelineEvent::from_plaintext(event.cast());
709        if let Some(push_ctx) = push_ctx {
710            event.set_push_actions(push_ctx.for_event(event.raw()).await);
711        }
712
713        event
714    }
715
716    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
717    /// decrypted if needs be.
718    ///
719    /// Only logs from the crypto crate will indicate a failure to decrypt.
720    #[cfg(feature = "experimental-encrypted-state-events")]
721    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
722    async fn try_decrypt_event(
723        &self,
724        event: Raw<AnyTimelineEvent>,
725        push_ctx: Option<&PushContext>,
726    ) -> TimelineEvent {
727        // If we have either an encrypted message-like or state event, try to decrypt.
728        match event.deserialize_as::<AnySyncTimelineEvent>() {
729            Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
730                SyncMessageLikeEvent::Original(_),
731            ))) => {
732                if let Ok(event) = self
733                    .decrypt_event(
734                        event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
735                        push_ctx,
736                    )
737                    .await
738                {
739                    return event;
740                }
741            }
742            Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
743                SyncStateEvent::Original(_),
744            ))) => {
745                if let Ok(event) = self
746                    .decrypt_event(
747                        event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
748                        push_ctx,
749                    )
750                    .await
751                {
752                    return event;
753                }
754            }
755            _ => {}
756        }
757
758        let mut event = TimelineEvent::from_plaintext(event.cast());
759        if let Some(push_ctx) = push_ctx {
760            event.set_push_actions(push_ctx.for_event(event.raw()).await);
761        }
762
763        event
764    }
765
766    /// Fetch the event with the given `EventId` in this room.
767    ///
768    /// It uses the given [`RequestConfig`] if provided, or the client's default
769    /// one otherwise.
770    pub async fn event(
771        &self,
772        event_id: &EventId,
773        request_config: Option<RequestConfig>,
774    ) -> Result<TimelineEvent> {
775        let request =
776            get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
777
778        let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
779        let push_ctx = self.push_context().await?;
780        let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
781
782        // Save the event into the event cache, if it's set up.
783        if let Ok((cache, _handles)) = self.event_cache().await {
784            cache.save_events([event.clone()]).await;
785        }
786
787        Ok(event)
788    }
789
790    /// Try to load the event from the [`EventCache`][crate::event_cache], if
791    /// it's enabled, or fetch it from the homeserver.
792    ///
793    /// When running the request against the homeserver, it uses the given
794    /// [`RequestConfig`] if provided, or the client's default one
795    /// otherwise.
796    pub async fn load_or_fetch_event(
797        &self,
798        event_id: &EventId,
799        request_config: Option<RequestConfig>,
800    ) -> Result<TimelineEvent> {
801        match self.event_cache().await {
802            Ok((event_cache, _drop_handles)) => {
803                if let Some(event) = event_cache.find_event(event_id).await {
804                    return Ok(event);
805                }
806                // Fallthrough: try with a request.
807            }
808            Err(err) => {
809                debug!("error when getting the event cache: {err}");
810            }
811        }
812        self.event(event_id, request_config).await
813    }
814
815    /// Fetch the event with the given `EventId` in this room, using the
816    /// `/context` endpoint to get more information.
817    pub async fn event_with_context(
818        &self,
819        event_id: &EventId,
820        lazy_load_members: bool,
821        context_size: UInt,
822        request_config: Option<RequestConfig>,
823    ) -> Result<EventWithContextResponse> {
824        let mut request =
825            context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
826
827        request.limit = context_size;
828
829        if lazy_load_members {
830            request.filter.lazy_load_options =
831                LazyLoadOptions::Enabled { include_redundant_members: false };
832        }
833
834        let response = self.client.send(request).with_request_config(request_config).await?;
835
836        let push_ctx = self.push_context().await?;
837        let push_ctx = push_ctx.as_ref();
838        let target_event = if let Some(event) = response.event {
839            Some(self.try_decrypt_event(event, push_ctx).await)
840        } else {
841            None
842        };
843
844        // Note: the joined future will fail if any future failed, but
845        // [`Self::try_decrypt_event`] doesn't hard-fail when there's a
846        // decryption error, so we should prevent against most bad cases here.
847        let (events_before, events_after) = join!(
848            join_all(
849                response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
850            ),
851            join_all(
852                response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
853            ),
854        );
855
856        // Save the loaded events into the event cache, if it's set up.
857        if let Ok((cache, _handles)) = self.event_cache().await {
858            let mut events_to_save: Vec<TimelineEvent> = Vec::new();
859            if let Some(event) = &target_event {
860                events_to_save.push(event.clone());
861            }
862
863            for event in &events_before {
864                events_to_save.push(event.clone());
865            }
866
867            for event in &events_after {
868                events_to_save.push(event.clone());
869            }
870
871            cache.save_events(events_to_save).await;
872        }
873
874        Ok(EventWithContextResponse {
875            event: target_event,
876            events_before,
877            events_after,
878            state: response.state,
879            prev_batch_token: response.start,
880            next_batch_token: response.end,
881        })
882    }
883
884    pub(crate) async fn request_members(&self) -> Result<()> {
885        self.client
886            .locks()
887            .members_request_deduplicated_handler
888            .run(self.room_id().to_owned(), async move {
889                let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
890                let response = self
891                    .client
892                    .send(request.clone())
893                    .with_request_config(
894                        // In some cases it can take longer than 30s to load:
895                        // https://github.com/element-hq/synapse/issues/16872
896                        RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
897                    )
898                    .await?;
899
900                // That's a large `Future`. Let's `Box::pin` to reduce its size on the stack.
901                Box::pin(self.client.base_client().receive_all_members(
902                    self.room_id(),
903                    &request,
904                    &response,
905                ))
906                .await?;
907
908                Ok(())
909            })
910            .await
911    }
912
913    /// Request to update the encryption state for this room.
914    ///
915    /// It does nothing if the encryption state is already
916    /// [`EncryptionState::Encrypted`] or [`EncryptionState::NotEncrypted`].
917    pub async fn request_encryption_state(&self) -> Result<()> {
918        if !self.inner.encryption_state().is_unknown() {
919            return Ok(());
920        }
921
922        self.client
923            .locks()
924            .encryption_state_deduplicated_handler
925            .run(self.room_id().to_owned(), async move {
926                // Request the event from the server.
927                let request = get_state_event_for_key::v3::Request::new(
928                    self.room_id().to_owned(),
929                    StateEventType::RoomEncryption,
930                    "".to_owned(),
931                );
932                let response = match self.client.send(request).await {
933                    Ok(response) => Some(
934                        response
935                            .into_content()
936                            .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
937                    ),
938                    Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
939                    Err(err) => return Err(err.into()),
940                };
941
942                let _sync_lock = self.client.base_client().sync_lock().lock().await;
943
944                // Persist the event and the fact that we requested it from the server in
945                // `RoomInfo`.
946                let mut room_info = self.clone_info();
947                room_info.mark_encryption_state_synced();
948                room_info.set_encryption_event(response.clone());
949                let mut changes = StateChanges::default();
950                changes.add_room(room_info.clone());
951
952                self.client.state_store().save_changes(&changes).await?;
953                self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
954
955                Ok(())
956            })
957            .await
958    }
959
960    /// Check the encryption state of this room.
961    ///
962    /// If the result is [`EncryptionState::Unknown`], one might want to call
963    /// [`Room::request_encryption_state`].
964    pub fn encryption_state(&self) -> EncryptionState {
965        self.inner.encryption_state()
966    }
967
968    /// Force to update the encryption state by calling
969    /// [`Room::request_encryption_state`], and then calling
970    /// [`Room::encryption_state`].
971    ///
972    /// This method is useful to ensure the encryption state is up-to-date.
973    pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
974        self.request_encryption_state().await?;
975
976        Ok(self.encryption_state())
977    }
978
979    /// Gets additional context info about the client crypto.
980    #[cfg(feature = "e2e-encryption")]
981    pub async fn crypto_context_info(&self) -> CryptoContextInfo {
982        let encryption = self.client.encryption();
983
984        let this_device_is_verified = match encryption.get_own_device().await {
985            Ok(Some(device)) => device.is_verified_with_cross_signing(),
986
987            // Should not happen, there will always be an own device
988            _ => true,
989        };
990
991        let backup_exists_on_server =
992            encryption.backups().exists_on_server().await.unwrap_or(false);
993
994        CryptoContextInfo {
995            device_creation_ts: encryption.device_creation_timestamp().await,
996            this_device_is_verified,
997            is_backup_configured: encryption.backups().state() == BackupState::Enabled,
998            backup_exists_on_server,
999        }
1000    }
1001
1002    fn are_events_visible(&self) -> bool {
1003        if let RoomState::Invited = self.inner.state() {
1004            return matches!(
1005                self.inner.history_visibility_or_default(),
1006                HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1007            );
1008        }
1009
1010        true
1011    }
1012
1013    /// Sync the member list with the server.
1014    ///
1015    /// This method will de-duplicate requests if it is called multiple times in
1016    /// quick succession, in that case the return value will be `None`. This
1017    /// method does nothing if the members are already synced.
1018    pub async fn sync_members(&self) -> Result<()> {
1019        if !self.are_events_visible() {
1020            return Ok(());
1021        }
1022
1023        if !self.are_members_synced() {
1024            self.request_members().await
1025        } else {
1026            Ok(())
1027        }
1028    }
1029
1030    /// Get a specific member of this room.
1031    ///
1032    /// *Note*: This method will fetch the members from the homeserver if the
1033    /// member list isn't synchronized due to member lazy loading. Because of
1034    /// that it might panic if it isn't run on a tokio thread.
1035    ///
1036    /// Use [get_member_no_sync()](#method.get_member_no_sync) if you want a
1037    /// method that doesn't do any requests.
1038    ///
1039    /// # Arguments
1040    ///
1041    /// * `user_id` - The ID of the user that should be fetched out of the
1042    ///   store.
1043    pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1044        self.sync_members().await?;
1045        self.get_member_no_sync(user_id).await
1046    }
1047
1048    /// Get a specific member of this room.
1049    ///
1050    /// *Note*: This method will not fetch the members from the homeserver if
1051    /// the member list isn't synchronized due to member lazy loading. Thus,
1052    /// members could be missing.
1053    ///
1054    /// Use [get_member()](#method.get_member) if you want to ensure to always
1055    /// have the full member list to chose from.
1056    ///
1057    /// # Arguments
1058    ///
1059    /// * `user_id` - The ID of the user that should be fetched out of the
1060    ///   store.
1061    pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1062        Ok(self
1063            .inner
1064            .get_member(user_id)
1065            .await?
1066            .map(|member| RoomMember::new(self.client.clone(), member)))
1067    }
1068
1069    /// Get members for this room, with the given memberships.
1070    ///
1071    /// *Note*: This method will fetch the members from the homeserver if the
1072    /// member list isn't synchronized due to member lazy loading. Because of
1073    /// that it might panic if it isn't run on a tokio thread.
1074    ///
1075    /// Use [members_no_sync()](#method.members_no_sync) if you want a
1076    /// method that doesn't do any requests.
1077    pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1078        self.sync_members().await?;
1079        self.members_no_sync(memberships).await
1080    }
1081
1082    /// Get members for this room, with the given memberships.
1083    ///
1084    /// *Note*: This method will not fetch the members from the homeserver if
1085    /// the member list isn't synchronized due to member lazy loading. Thus,
1086    /// members could be missing.
1087    ///
1088    /// Use [members()](#method.members) if you want to ensure to always get
1089    /// the full member list.
1090    pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1091        Ok(self
1092            .inner
1093            .members(memberships)
1094            .await?
1095            .into_iter()
1096            .map(|member| RoomMember::new(self.client.clone(), member))
1097            .collect())
1098    }
1099
1100    /// Get all state events of a given type in this room.
1101    pub async fn get_state_events(
1102        &self,
1103        event_type: StateEventType,
1104    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1105        self.client
1106            .state_store()
1107            .get_state_events(self.room_id(), event_type)
1108            .await
1109            .map_err(Into::into)
1110    }
1111
1112    /// Get all state events of a given statically-known type in this room.
1113    ///
1114    /// # Examples
1115    ///
1116    /// ```no_run
1117    /// # async {
1118    /// # let room: matrix_sdk::Room = todo!();
1119    /// use matrix_sdk::ruma::{
1120    ///     events::room::member::RoomMemberEventContent, serde::Raw,
1121    /// };
1122    ///
1123    /// let room_members =
1124    ///     room.get_state_events_static::<RoomMemberEventContent>().await?;
1125    /// # anyhow::Ok(())
1126    /// # };
1127    /// ```
1128    pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1129    where
1130        C: StaticEventContent<IsPrefix = ruma::events::False>
1131            + StaticStateEventContent
1132            + RedactContent,
1133        C::Redacted: RedactedStateEventContent,
1134    {
1135        Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1136    }
1137
1138    /// Get the state events of a given type with the given state keys in this
1139    /// room.
1140    pub async fn get_state_events_for_keys(
1141        &self,
1142        event_type: StateEventType,
1143        state_keys: &[&str],
1144    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1145        self.client
1146            .state_store()
1147            .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1148            .await
1149            .map_err(Into::into)
1150    }
1151
1152    /// Get the state events of a given statically-known type with the given
1153    /// state keys in this room.
1154    ///
1155    /// # Examples
1156    ///
1157    /// ```no_run
1158    /// # async {
1159    /// # let room: matrix_sdk::Room = todo!();
1160    /// # let user_ids: &[matrix_sdk::ruma::OwnedUserId] = &[];
1161    /// use matrix_sdk::ruma::events::room::member::RoomMemberEventContent;
1162    ///
1163    /// let room_members = room
1164    ///     .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
1165    ///         user_ids,
1166    ///     )
1167    ///     .await?;
1168    /// # anyhow::Ok(())
1169    /// # };
1170    /// ```
1171    pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1172        &self,
1173        state_keys: I,
1174    ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1175    where
1176        C: StaticEventContent<IsPrefix = ruma::events::False>
1177            + StaticStateEventContent
1178            + RedactContent,
1179        C::StateKey: Borrow<K>,
1180        C::Redacted: RedactedStateEventContent,
1181        K: AsRef<str> + Sized + Sync + 'a,
1182        I: IntoIterator<Item = &'a K> + Send,
1183        I::IntoIter: Send,
1184    {
1185        Ok(self
1186            .client
1187            .state_store()
1188            .get_state_events_for_keys_static(self.room_id(), state_keys)
1189            .await?)
1190    }
1191
1192    /// Get a specific state event in this room.
1193    pub async fn get_state_event(
1194        &self,
1195        event_type: StateEventType,
1196        state_key: &str,
1197    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1198        self.client
1199            .state_store()
1200            .get_state_event(self.room_id(), event_type, state_key)
1201            .await
1202            .map_err(Into::into)
1203    }
1204
1205    /// Get a specific state event of statically-known type with an empty state
1206    /// key in this room.
1207    ///
1208    /// # Examples
1209    ///
1210    /// ```no_run
1211    /// # async {
1212    /// # let room: matrix_sdk::Room = todo!();
1213    /// use matrix_sdk::ruma::events::room::power_levels::RoomPowerLevelsEventContent;
1214    ///
1215    /// let power_levels = room
1216    ///     .get_state_event_static::<RoomPowerLevelsEventContent>()
1217    ///     .await?
1218    ///     .expect("every room has a power_levels event")
1219    ///     .deserialize()?;
1220    /// # anyhow::Ok(())
1221    /// # };
1222    /// ```
1223    pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1224    where
1225        C: StaticEventContent<IsPrefix = ruma::events::False>
1226            + StaticStateEventContent<StateKey = EmptyStateKey>
1227            + RedactContent,
1228        C::Redacted: RedactedStateEventContent,
1229    {
1230        self.get_state_event_static_for_key(&EmptyStateKey).await
1231    }
1232
1233    /// Get a specific state event of statically-known type in this room.
1234    ///
1235    /// # Examples
1236    ///
1237    /// ```no_run
1238    /// # async {
1239    /// # let room: matrix_sdk::Room = todo!();
1240    /// use matrix_sdk::ruma::{
1241    ///     events::room::member::RoomMemberEventContent, serde::Raw, user_id,
1242    /// };
1243    ///
1244    /// let member_event = room
1245    ///     .get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id!(
1246    ///         "@alice:example.org"
1247    ///     ))
1248    ///     .await?;
1249    /// # anyhow::Ok(())
1250    /// # };
1251    /// ```
1252    pub async fn get_state_event_static_for_key<C, K>(
1253        &self,
1254        state_key: &K,
1255    ) -> Result<Option<RawSyncOrStrippedState<C>>>
1256    where
1257        C: StaticEventContent<IsPrefix = ruma::events::False>
1258            + StaticStateEventContent
1259            + RedactContent,
1260        C::StateKey: Borrow<K>,
1261        C::Redacted: RedactedStateEventContent,
1262        K: AsRef<str> + ?Sized + Sync,
1263    {
1264        Ok(self
1265            .client
1266            .state_store()
1267            .get_state_event_static_for_key(self.room_id(), state_key)
1268            .await?)
1269    }
1270
1271    /// Returns the parents this room advertises as its parents.
1272    ///
1273    /// Results are in no particular order.
1274    pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1275        // Implements this algorithm:
1276        // https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships
1277
1278        // Get all m.space.parent events for this room
1279        Ok(self
1280            .get_state_events_static::<SpaceParentEventContent>()
1281            .await?
1282            .into_iter()
1283            // Extract state key (ie. the parent's id) and sender
1284            .filter_map(|parent_event| match parent_event.deserialize() {
1285                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1286                    Some((e.state_key.to_owned(), e.sender))
1287                }
1288                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1289                Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1290                Err(e) => {
1291                    info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1292                    None
1293                }
1294            })
1295            // Check whether the parent recognizes this room as its child
1296            .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1297                let Some(parent_room) = self.client.get_room(&state_key) else {
1298                    // We are not in the room, cannot check if the relationship is reciprocal
1299                    // TODO: try peeking into the room
1300                    return Ok(ParentSpace::Unverifiable(state_key));
1301                };
1302                // Get the m.space.child state of the parent with this room's id
1303                // as state key.
1304                if let Some(child_event) = parent_room
1305                    .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1306                    .await?
1307                {
1308                    match child_event.deserialize() {
1309                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1310                            // There is a valid m.space.child in the parent pointing to
1311                            // this room
1312                            return Ok(ParentSpace::Reciprocal(parent_room));
1313                        }
1314                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1315                        Ok(SyncOrStrippedState::Stripped(_)) => {}
1316                        Err(e) => {
1317                            info!(
1318                                room_id = ?self.room_id(), parent_room_id = ?state_key,
1319                                "Could not deserialize m.space.child: {e}"
1320                            );
1321                        }
1322                    }
1323                    // Otherwise the event is either invalid or redacted. If
1324                    // redacted it would be missing the
1325                    // `via` key, thereby invalidating that end of the
1326                    // relationship: https://spec.matrix.org/v1.8/client-server-api/#mspacechild
1327                }
1328
1329                // No reciprocal m.space.child found, let's check if the sender has the
1330                // power to set it
1331                let Some(member) = parent_room.get_member(&sender).await? else {
1332                    // Sender is not even in the parent room
1333                    return Ok(ParentSpace::Illegitimate(parent_room));
1334                };
1335
1336                if member.can_send_state(StateEventType::SpaceChild) {
1337                    // Sender does have the power to set m.room.child
1338                    Ok(ParentSpace::WithPowerlevel(parent_room))
1339                } else {
1340                    Ok(ParentSpace::Illegitimate(parent_room))
1341                }
1342            })
1343            .collect::<FuturesUnordered<_>>())
1344    }
1345
1346    /// Read account data in this room, from storage.
1347    pub async fn account_data(
1348        &self,
1349        data_type: RoomAccountDataEventType,
1350    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1351        self.client
1352            .state_store()
1353            .get_room_account_data_event(self.room_id(), data_type)
1354            .await
1355            .map_err(Into::into)
1356    }
1357
1358    /// Get account data of a statically-known type in this room, from storage.
1359    ///
1360    /// # Examples
1361    ///
1362    /// ```no_run
1363    /// # async {
1364    /// # let room: matrix_sdk::Room = todo!();
1365    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1366    ///
1367    /// match room.account_data_static::<FullyReadEventContent>().await? {
1368    ///     Some(fully_read) => {
1369    ///         println!("Found read marker: {:?}", fully_read.deserialize()?)
1370    ///     }
1371    ///     None => println!("No read marker for this room"),
1372    /// }
1373    /// # anyhow::Ok(())
1374    /// # };
1375    /// ```
1376    pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1377    where
1378        C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1379    {
1380        Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1381    }
1382
1383    /// Check if all members of this room are verified and all their devices are
1384    /// verified.
1385    ///
1386    /// Returns true if all devices in the room are verified, otherwise false.
1387    #[cfg(feature = "e2e-encryption")]
1388    pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1389        let user_ids = self
1390            .client
1391            .state_store()
1392            .get_user_ids(self.room_id(), RoomMemberships::empty())
1393            .await?;
1394
1395        for user_id in user_ids {
1396            let devices = self.client.encryption().get_user_devices(&user_id).await?;
1397            let any_unverified = devices.devices().any(|d| !d.is_verified());
1398
1399            if any_unverified {
1400                return Ok(false);
1401            }
1402        }
1403
1404        Ok(true)
1405    }
1406
1407    /// Set the given account data event for this room.
1408    ///
1409    /// # Example
1410    /// ```
1411    /// # async {
1412    /// # let room: matrix_sdk::Room = todo!();
1413    /// # let event_id: ruma::OwnedEventId = todo!();
1414    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1415    /// let content = FullyReadEventContent::new(event_id);
1416    ///
1417    /// room.set_account_data(content).await?;
1418    /// # anyhow::Ok(())
1419    /// # };
1420    /// ```
1421    pub async fn set_account_data<T>(
1422        &self,
1423        content: T,
1424    ) -> Result<set_room_account_data::v3::Response>
1425    where
1426        T: RoomAccountDataEventContent,
1427    {
1428        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1429
1430        let request = set_room_account_data::v3::Request::new(
1431            own_user.to_owned(),
1432            self.room_id().to_owned(),
1433            &content,
1434        )?;
1435
1436        Ok(self.client.send(request).await?)
1437    }
1438
1439    /// Set the given raw account data event in this room.
1440    ///
1441    /// # Example
1442    /// ```
1443    /// # async {
1444    /// # let room: matrix_sdk::Room = todo!();
1445    /// use matrix_sdk::ruma::{
1446    ///     events::{
1447    ///         marked_unread::MarkedUnreadEventContent,
1448    ///         AnyRoomAccountDataEventContent, RoomAccountDataEventContent,
1449    ///     },
1450    ///     serde::Raw,
1451    /// };
1452    /// let marked_unread_content = MarkedUnreadEventContent::new(true);
1453    /// let full_event: AnyRoomAccountDataEventContent =
1454    ///     marked_unread_content.clone().into();
1455    /// room.set_account_data_raw(
1456    ///     marked_unread_content.event_type(),
1457    ///     Raw::new(&full_event).unwrap(),
1458    /// )
1459    /// .await?;
1460    /// # anyhow::Ok(())
1461    /// # };
1462    /// ```
1463    pub async fn set_account_data_raw(
1464        &self,
1465        event_type: RoomAccountDataEventType,
1466        content: Raw<AnyRoomAccountDataEventContent>,
1467    ) -> Result<set_room_account_data::v3::Response> {
1468        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1469
1470        let request = set_room_account_data::v3::Request::new_raw(
1471            own_user.to_owned(),
1472            self.room_id().to_owned(),
1473            event_type,
1474            content,
1475        );
1476
1477        Ok(self.client.send(request).await?)
1478    }
1479
1480    /// Adds a tag to the room, or updates it if it already exists.
1481    ///
1482    /// Returns the [`create_tag::v3::Response`] from the server.
1483    ///
1484    /// # Arguments
1485    /// * `tag` - The tag to add or update.
1486    ///
1487    /// * `tag_info` - Information about the tag, generally containing the
1488    ///   `order` parameter.
1489    ///
1490    /// # Examples
1491    ///
1492    /// ```no_run
1493    /// # use std::str::FromStr;
1494    /// # use ruma::events::tag::{TagInfo, TagName, UserTagName};
1495    /// # async {
1496    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
1497    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
1498    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
1499    /// use matrix_sdk::ruma::events::tag::TagInfo;
1500    ///
1501    /// if let Some(room) = client.get_room(&room_id) {
1502    ///     let mut tag_info = TagInfo::new();
1503    ///     tag_info.order = Some(0.9);
1504    ///     let user_tag = UserTagName::from_str("u.work")?;
1505    ///
1506    ///     room.set_tag(TagName::User(user_tag), tag_info).await?;
1507    /// }
1508    /// # anyhow::Ok(()) };
1509    /// ```
1510    pub async fn set_tag(
1511        &self,
1512        tag: TagName,
1513        tag_info: TagInfo,
1514    ) -> Result<create_tag::v3::Response> {
1515        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1516        let request = create_tag::v3::Request::new(
1517            user_id.to_owned(),
1518            self.inner.room_id().to_owned(),
1519            tag.to_string(),
1520            tag_info,
1521        );
1522        Ok(self.client.send(request).await?)
1523    }
1524
1525    /// Removes a tag from the room.
1526    ///
1527    /// Returns the [`delete_tag::v3::Response`] from the server.
1528    ///
1529    /// # Arguments
1530    /// * `tag` - The tag to remove.
1531    pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1532        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1533        let request = delete_tag::v3::Request::new(
1534            user_id.to_owned(),
1535            self.inner.room_id().to_owned(),
1536            tag.to_string(),
1537        );
1538        Ok(self.client.send(request).await?)
1539    }
1540
1541    /// Add or remove the `m.favourite` flag for this room.
1542    ///
1543    /// If `is_favourite` is `true`, and the `m.low_priority` tag is set on the
1544    /// room, the tag will be removed too.
1545    ///
1546    /// # Arguments
1547    ///
1548    /// * `is_favourite` - Whether to mark this room as favourite.
1549    /// * `tag_order` - The order of the tag if any.
1550    pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1551        if is_favourite {
1552            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1553
1554            self.set_tag(TagName::Favorite, tag_info).await?;
1555
1556            if self.is_low_priority() {
1557                self.remove_tag(TagName::LowPriority).await?;
1558            }
1559        } else {
1560            self.remove_tag(TagName::Favorite).await?;
1561        }
1562        Ok(())
1563    }
1564
1565    /// Add or remove the `m.lowpriority` flag for this room.
1566    ///
1567    /// If `is_low_priority` is `true`, and the `m.favourite` tag is set on the
1568    /// room, the tag will be removed too.
1569    ///
1570    /// # Arguments
1571    ///
1572    /// * `is_low_priority` - Whether to mark this room as low_priority or not.
1573    /// * `tag_order` - The order of the tag if any.
1574    pub async fn set_is_low_priority(
1575        &self,
1576        is_low_priority: bool,
1577        tag_order: Option<f64>,
1578    ) -> Result<()> {
1579        if is_low_priority {
1580            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1581
1582            self.set_tag(TagName::LowPriority, tag_info).await?;
1583
1584            if self.is_favourite() {
1585                self.remove_tag(TagName::Favorite).await?;
1586            }
1587        } else {
1588            self.remove_tag(TagName::LowPriority).await?;
1589        }
1590        Ok(())
1591    }
1592
1593    /// Sets whether this room is a DM.
1594    ///
1595    /// When setting this room as DM, it will be marked as DM for all active
1596    /// members of the room. When unsetting this room as DM, it will be
1597    /// unmarked as DM for all users, not just the members.
1598    ///
1599    /// # Arguments
1600    /// * `is_direct` - Whether to mark this room as direct.
1601    pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1602        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1603
1604        let mut content = self
1605            .client
1606            .account()
1607            .account_data::<DirectEventContent>()
1608            .await?
1609            .map(|c| c.deserialize())
1610            .transpose()?
1611            .unwrap_or_default();
1612
1613        let this_room_id = self.inner.room_id();
1614
1615        if is_direct {
1616            let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1617            room_members.retain(|member| member.user_id() != self.own_user_id());
1618
1619            for member in room_members {
1620                let entry = content.entry(member.user_id().into()).or_default();
1621                if !entry.iter().any(|room_id| room_id == this_room_id) {
1622                    entry.push(this_room_id.to_owned());
1623                }
1624            }
1625        } else {
1626            for (_, list) in content.iter_mut() {
1627                list.retain(|room_id| *room_id != this_room_id);
1628            }
1629
1630            // Remove user ids that don't have any room marked as DM
1631            content.retain(|_, list| !list.is_empty());
1632        }
1633
1634        let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1635
1636        self.client.send(request).await?;
1637        Ok(())
1638    }
1639
1640    /// Tries to decrypt a room event.
1641    ///
1642    /// # Arguments
1643    /// * `event` - The room event to be decrypted.
1644    ///
1645    /// Returns the decrypted event. In the case of a decryption error, returns
1646    /// a `TimelineEvent` representing the decryption error.
1647    #[cfg(feature = "e2e-encryption")]
1648    #[cfg(not(feature = "experimental-encrypted-state-events"))]
1649    pub async fn decrypt_event(
1650        &self,
1651        event: &Raw<OriginalSyncRoomEncryptedEvent>,
1652        push_ctx: Option<&PushContext>,
1653    ) -> Result<TimelineEvent> {
1654        let machine = self.client.olm_machine().await;
1655        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1656
1657        match machine
1658            .try_decrypt_room_event(
1659                event.cast_ref(),
1660                self.inner.room_id(),
1661                self.client.decryption_settings(),
1662            )
1663            .await?
1664        {
1665            RoomEventDecryptionResult::Decrypted(decrypted) => {
1666                let push_actions = if let Some(push_ctx) = push_ctx {
1667                    Some(push_ctx.for_event(&decrypted.event).await)
1668                } else {
1669                    None
1670                };
1671                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1672            }
1673            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1674                self.client
1675                    .encryption()
1676                    .backups()
1677                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1678                Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1679            }
1680        }
1681    }
1682
1683    /// Tries to decrypt a room event.
1684    ///
1685    /// # Arguments
1686    /// * `event` - The room event to be decrypted.
1687    ///
1688    /// Returns the decrypted event. In the case of a decryption error, returns
1689    /// a `TimelineEvent` representing the decryption error.
1690    #[cfg(feature = "experimental-encrypted-state-events")]
1691    pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1692        &self,
1693        event: &Raw<T>,
1694        push_ctx: Option<&PushContext>,
1695    ) -> Result<TimelineEvent> {
1696        let machine = self.client.olm_machine().await;
1697        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1698
1699        match machine
1700            .try_decrypt_room_event(
1701                event.cast_ref(),
1702                self.inner.room_id(),
1703                self.client.decryption_settings(),
1704            )
1705            .await?
1706        {
1707            RoomEventDecryptionResult::Decrypted(decrypted) => {
1708                let push_actions = if let Some(push_ctx) = push_ctx {
1709                    Some(push_ctx.for_event(&decrypted.event).await)
1710                } else {
1711                    None
1712                };
1713                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1714            }
1715            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1716                self.client
1717                    .encryption()
1718                    .backups()
1719                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1720                // Cast safety: Anything that can be cast to EncryptedEvent must be a timeline
1721                // event.
1722                Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1723            }
1724        }
1725    }
1726
1727    /// Fetches the [`EncryptionInfo`] for an event decrypted with the supplied
1728    /// session_id.
1729    ///
1730    /// This may be used when we receive an update for a session, and we want to
1731    /// reflect the changes in messages we have received that were encrypted
1732    /// with that session, e.g. to remove a warning shield because a device is
1733    /// now verified.
1734    ///
1735    /// # Arguments
1736    /// * `session_id` - The ID of the Megolm session to get information for.
1737    /// * `sender` - The (claimed) sender of the event where the session was
1738    ///   used.
1739    #[cfg(feature = "e2e-encryption")]
1740    pub async fn get_encryption_info(
1741        &self,
1742        session_id: &str,
1743        sender: &UserId,
1744    ) -> Option<Arc<EncryptionInfo>> {
1745        let machine = self.client.olm_machine().await;
1746        let machine = machine.as_ref()?;
1747        machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1748    }
1749
1750    /// Forces the currently active room key, which is used to encrypt messages,
1751    /// to be rotated.
1752    ///
1753    /// A new room key will be crated and shared with all the room members the
1754    /// next time a message will be sent. You don't have to call this method,
1755    /// room keys will be rotated automatically when necessary. This method is
1756    /// still useful for debugging purposes.
1757    ///
1758    /// For more info please take a look a the [`encryption`] module
1759    /// documentation.
1760    ///
1761    /// [`encryption`]: crate::encryption
1762    #[cfg(feature = "e2e-encryption")]
1763    pub async fn discard_room_key(&self) -> Result<()> {
1764        let machine = self.client.olm_machine().await;
1765        if let Some(machine) = machine.as_ref() {
1766            machine.discard_room_key(self.inner.room_id()).await?;
1767            Ok(())
1768        } else {
1769            Err(Error::NoOlmMachine)
1770        }
1771    }
1772
1773    /// Ban the user with `UserId` from this room.
1774    ///
1775    /// # Arguments
1776    ///
1777    /// * `user_id` - The user to ban with `UserId`.
1778    ///
1779    /// * `reason` - The reason for banning this user.
1780    #[instrument(skip_all)]
1781    pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1782        let request = assign!(
1783            ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1784            { reason: reason.map(ToOwned::to_owned) }
1785        );
1786        self.client.send(request).await?;
1787        Ok(())
1788    }
1789
1790    /// Unban the user with `UserId` from this room.
1791    ///
1792    /// # Arguments
1793    ///
1794    /// * `user_id` - The user to unban with `UserId`.
1795    ///
1796    /// * `reason` - The reason for unbanning this user.
1797    #[instrument(skip_all)]
1798    pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1799        let request = assign!(
1800            unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1801            { reason: reason.map(ToOwned::to_owned) }
1802        );
1803        self.client.send(request).await?;
1804        Ok(())
1805    }
1806
1807    /// Kick a user out of this room.
1808    ///
1809    /// # Arguments
1810    ///
1811    /// * `user_id` - The `UserId` of the user that should be kicked out of the
1812    ///   room.
1813    ///
1814    /// * `reason` - Optional reason why the room member is being kicked out.
1815    #[instrument(skip_all)]
1816    pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1817        let request = assign!(
1818            kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1819            { reason: reason.map(ToOwned::to_owned) }
1820        );
1821        self.client.send(request).await?;
1822        Ok(())
1823    }
1824
1825    /// Invite the specified user by `UserId` to this room.
1826    ///
1827    /// # Arguments
1828    ///
1829    /// * `user_id` - The `UserId` of the user to invite to the room.
1830    #[instrument(skip_all)]
1831    pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1832        #[cfg(feature = "e2e-encryption")]
1833        if self.client.inner.enable_share_history_on_invite {
1834            shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1835        }
1836
1837        let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1838        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1839        self.client.send(request).await?;
1840
1841        // Force a future room members reload before sending any event to prevent UTDs
1842        // that can happen when some event is sent after a room member has been invited
1843        // but before the /sync request could fetch the membership change event.
1844        self.mark_members_missing();
1845
1846        Ok(())
1847    }
1848
1849    /// Invite the specified user by third party id to this room.
1850    ///
1851    /// # Arguments
1852    ///
1853    /// * `invite_id` - A third party id of a user to invite to the room.
1854    #[instrument(skip_all)]
1855    pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1856        let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1857        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1858        self.client.send(request).await?;
1859
1860        // Force a future room members reload before sending any event to prevent UTDs
1861        // that can happen when some event is sent after a room member has been invited
1862        // but before the /sync request could fetch the membership change event.
1863        self.mark_members_missing();
1864
1865        Ok(())
1866    }
1867
1868    /// Activate typing notice for this room.
1869    ///
1870    /// The typing notice remains active for 4s. It can be deactivate at any
1871    /// point by setting typing to `false`. If this method is called while
1872    /// the typing notice is active nothing will happen. This method can be
1873    /// called on every key stroke, since it will do nothing while typing is
1874    /// active.
1875    ///
1876    /// # Arguments
1877    ///
1878    /// * `typing` - Whether the user is typing or has stopped typing.
1879    ///
1880    /// # Examples
1881    ///
1882    /// ```no_run
1883    /// use std::time::Duration;
1884    ///
1885    /// use matrix_sdk::ruma::api::client::typing::create_typing_event::v3::Typing;
1886    /// # use matrix_sdk::{
1887    /// #     Client, config::SyncSettings,
1888    /// #     ruma::room_id,
1889    /// # };
1890    /// # use url::Url;
1891    ///
1892    /// # async {
1893    /// # let homeserver = Url::parse("http://localhost:8080")?;
1894    /// # let client = Client::new(homeserver).await?;
1895    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
1896    ///
1897    /// if let Some(room) = client.get_room(&room_id) {
1898    ///     room.typing_notice(true).await?
1899    /// }
1900    /// # anyhow::Ok(()) };
1901    /// ```
1902    pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1903        self.ensure_room_joined()?;
1904
1905        // Only send a request to the homeserver if the old timeout has elapsed
1906        // or the typing notice changed state within the `TYPING_NOTICE_TIMEOUT`
1907        let send = if let Some(typing_time) =
1908            self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1909        {
1910            if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1911                // We always reactivate the typing notice if typing is true or
1912                // we may need to deactivate it if it's
1913                // currently active if typing is false
1914                typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1915            } else {
1916                // Only send a request when we need to deactivate typing
1917                !typing
1918            }
1919        } else {
1920            // Typing notice is currently deactivated, therefore, send a request
1921            // only when it's about to be activated
1922            typing
1923        };
1924
1925        if send {
1926            self.send_typing_notice(typing).await?;
1927        }
1928
1929        Ok(())
1930    }
1931
1932    #[instrument(name = "typing_notice", skip(self))]
1933    async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1934        let typing = if typing {
1935            self.client
1936                .inner
1937                .typing_notice_times
1938                .write()
1939                .unwrap()
1940                .insert(self.room_id().to_owned(), Instant::now());
1941            Typing::Yes(TYPING_NOTICE_TIMEOUT)
1942        } else {
1943            self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1944            Typing::No
1945        };
1946
1947        let request = create_typing_event::v3::Request::new(
1948            self.own_user_id().to_owned(),
1949            self.room_id().to_owned(),
1950            typing,
1951        );
1952
1953        self.client.send(request).await?;
1954
1955        Ok(())
1956    }
1957
1958    /// Send a request to set a single receipt.
1959    ///
1960    /// If an unthreaded receipt is sent, this will also unset the unread flag
1961    /// of the room if necessary.
1962    ///
1963    /// # Arguments
1964    ///
1965    /// * `receipt_type` - The type of the receipt to set. Note that it is
1966    ///   possible to set the fully-read marker although it is technically not a
1967    ///   receipt.
1968    ///
1969    /// * `thread` - The thread where this receipt should apply, if any. Note
1970    ///   that this must be [`ReceiptThread::Unthreaded`] when sending a
1971    ///   [`ReceiptType::FullyRead`][create_receipt::v3::ReceiptType::FullyRead].
1972    ///
1973    /// * `event_id` - The `EventId` of the event to set the receipt on.
1974    #[instrument(skip_all)]
1975    pub async fn send_single_receipt(
1976        &self,
1977        receipt_type: create_receipt::v3::ReceiptType,
1978        thread: ReceiptThread,
1979        event_id: OwnedEventId,
1980    ) -> Result<()> {
1981        // Since the receipt type and the thread aren't Hash/Ord, flatten then as a
1982        // string key.
1983        let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1984
1985        self.client
1986            .inner
1987            .locks
1988            .read_receipt_deduplicated_handler
1989            .run((request_key, event_id.clone()), async {
1990                // We will unset the unread flag if we send an unthreaded receipt.
1991                let is_unthreaded = thread == ReceiptThread::Unthreaded;
1992
1993                let mut request = create_receipt::v3::Request::new(
1994                    self.room_id().to_owned(),
1995                    receipt_type,
1996                    event_id,
1997                );
1998                request.thread = thread;
1999
2000                self.client.send(request).await?;
2001
2002                if is_unthreaded {
2003                    self.set_unread_flag(false).await?;
2004                }
2005
2006                Ok(())
2007            })
2008            .await
2009    }
2010
2011    /// Send a request to set multiple receipts at once.
2012    ///
2013    /// This will also unset the unread flag of the room if necessary.
2014    ///
2015    /// # Arguments
2016    ///
2017    /// * `receipts` - The `Receipts` to send.
2018    ///
2019    /// If `receipts` is empty, this is a no-op.
2020    #[instrument(skip_all)]
2021    pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2022        if receipts.is_empty() {
2023            return Ok(());
2024        }
2025
2026        let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2027        let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2028            fully_read,
2029            read_receipt: public_read_receipt,
2030            private_read_receipt,
2031        });
2032
2033        self.client.send(request).await?;
2034
2035        self.set_unread_flag(false).await?;
2036
2037        Ok(())
2038    }
2039
2040    /// Helper function to enable End-to-end encryption in this room.
2041    /// `encrypted_state_events` is not used unless the
2042    /// `experimental-encrypted-state-events` feature is enabled.
2043    #[allow(unused_variables, unused_mut)]
2044    async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2045        use ruma::{
2046            events::room::encryption::RoomEncryptionEventContent, EventEncryptionAlgorithm,
2047        };
2048        const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2049
2050        if !self.latest_encryption_state().await?.is_encrypted() {
2051            let mut content =
2052                RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2053            #[cfg(feature = "experimental-encrypted-state-events")]
2054            if encrypted_state_events {
2055                content = content.with_encrypted_state();
2056            }
2057            self.send_state_event(content).await?;
2058
2059            // Spin on the sync beat event, since the first sync we receive might not
2060            // include the encryption event.
2061            //
2062            // TODO do we want to return an error here if we time out? This
2063            // could be quite useful if someone wants to enable encryption and
2064            // send a message right after it's enabled.
2065            let res = timeout(
2066                async {
2067                    loop {
2068                        // Listen for sync events, then check if the encryption state is known.
2069                        self.client.inner.sync_beat.listen().await;
2070                        let _sync_lock = self.client.base_client().sync_lock().lock().await;
2071                        if !self.inner.encryption_state().is_unknown() {
2072                            break;
2073                        }
2074                    }
2075                },
2076                SYNC_WAIT_TIME,
2077            )
2078            .await;
2079
2080            let _sync_lock = self.client.base_client().sync_lock().lock().await;
2081
2082            // If encryption was enabled, return.
2083            #[cfg(not(feature = "experimental-encrypted-state-events"))]
2084            if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2085                debug!("room successfully marked as encrypted");
2086                return Ok(());
2087            }
2088
2089            // If encryption with state event encryption was enabled, return.
2090            #[cfg(feature = "experimental-encrypted-state-events")]
2091            if res.is_ok() && {
2092                if encrypted_state_events {
2093                    self.inner.encryption_state().is_state_encrypted()
2094                } else {
2095                    self.inner.encryption_state().is_encrypted()
2096                }
2097            } {
2098                debug!("room successfully marked as encrypted");
2099                return Ok(());
2100            }
2101
2102            // If after waiting for multiple syncs, we don't have the encryption state we
2103            // expect, assume the local encryption state is incorrect; this will
2104            // cause the SDK to re-request it later for confirmation, instead of
2105            // assuming it's sync'd and correct (and not encrypted).
2106            debug!("still not marked as encrypted, marking encryption state as missing");
2107
2108            let mut room_info = self.clone_info();
2109            room_info.mark_encryption_state_missing();
2110            let mut changes = StateChanges::default();
2111            changes.add_room(room_info.clone());
2112
2113            self.client.state_store().save_changes(&changes).await?;
2114            self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2115        }
2116
2117        Ok(())
2118    }
2119
2120    /// Enable End-to-end encryption in this room.
2121    ///
2122    /// This method will be a noop if encryption is already enabled, otherwise
2123    /// sends a `m.room.encryption` state event to the room. This might fail if
2124    /// you don't have the appropriate power level to enable end-to-end
2125    /// encryption.
2126    ///
2127    /// A sync needs to be received to update the local room state. This method
2128    /// will wait for a sync to be received, this might time out if no
2129    /// sync loop is running or if the server is slow.
2130    ///
2131    /// # Examples
2132    ///
2133    /// ```no_run
2134    /// # use matrix_sdk::{
2135    /// #     Client, config::SyncSettings,
2136    /// #     ruma::room_id,
2137    /// # };
2138    /// # use url::Url;
2139    /// #
2140    /// # async {
2141    /// # let homeserver = Url::parse("http://localhost:8080")?;
2142    /// # let client = Client::new(homeserver).await?;
2143    /// # let room_id = room_id!("!test:localhost");
2144    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2145    ///
2146    /// if let Some(room) = client.get_room(&room_id) {
2147    ///     room.enable_encryption().await?
2148    /// }
2149    /// # anyhow::Ok(()) };
2150    /// ```
2151    #[instrument(skip_all)]
2152    pub async fn enable_encryption(&self) -> Result<()> {
2153        self.enable_encryption_inner(false).await
2154    }
2155
2156    /// Enable End-to-end encryption in this room, opting into experimental
2157    /// state event encryption.
2158    ///
2159    /// This method will be a noop if encryption is already enabled, otherwise
2160    /// sends a `m.room.encryption` state event to the room. This might fail if
2161    /// you don't have the appropriate power level to enable end-to-end
2162    /// encryption.
2163    ///
2164    /// A sync needs to be received to update the local room state. This method
2165    /// will wait for a sync to be received, this might time out if no
2166    /// sync loop is running or if the server is slow.
2167    ///
2168    /// # Examples
2169    ///
2170    /// ```no_run
2171    /// # use matrix_sdk::{
2172    /// #     Client, config::SyncSettings,
2173    /// #     ruma::room_id,
2174    /// # };
2175    /// # use url::Url;
2176    /// #
2177    /// # async {
2178    /// # let homeserver = Url::parse("http://localhost:8080")?;
2179    /// # let client = Client::new(homeserver).await?;
2180    /// # let room_id = room_id!("!test:localhost");
2181    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2182    ///
2183    /// if let Some(room) = client.get_room(&room_id) {
2184    ///     room.enable_encryption_with_state_event_encryption().await?
2185    /// }
2186    /// # anyhow::Ok(()) };
2187    /// ```
2188    #[instrument(skip_all)]
2189    #[cfg(feature = "experimental-encrypted-state-events")]
2190    pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2191        self.enable_encryption_inner(true).await
2192    }
2193
2194    /// Share a room key with users in the given room.
2195    ///
2196    /// This will create Olm sessions with all the users/device pairs in the
2197    /// room if necessary and share a room key that can be shared with them.
2198    ///
2199    /// Does nothing if no room key needs to be shared.
2200    // TODO: expose this publicly so people can pre-share a group session if
2201    // e.g. a user starts to type a message for a room.
2202    #[cfg(feature = "e2e-encryption")]
2203    #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2204    async fn preshare_room_key(&self) -> Result<()> {
2205        self.ensure_room_joined()?;
2206
2207        // Take and release the lock on the store, if needs be.
2208        let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2209        tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2210
2211        self.client
2212            .locks()
2213            .group_session_deduplicated_handler
2214            .run(self.room_id().to_owned(), async move {
2215                {
2216                    let members = self
2217                        .client
2218                        .state_store()
2219                        .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2220                        .await?;
2221                    self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2222                };
2223
2224                let response = self.share_room_key().await;
2225
2226                // If one of the responses failed invalidate the group
2227                // session as using it would end up in undecryptable
2228                // messages.
2229                if let Err(r) = response {
2230                    let machine = self.client.olm_machine().await;
2231                    if let Some(machine) = machine.as_ref() {
2232                        machine.discard_room_key(self.room_id()).await?;
2233                    }
2234                    return Err(r);
2235                }
2236
2237                Ok(())
2238            })
2239            .await
2240    }
2241
2242    /// Share a group session for a room.
2243    ///
2244    /// # Panics
2245    ///
2246    /// Panics if the client isn't logged in.
2247    #[cfg(feature = "e2e-encryption")]
2248    #[instrument(skip_all)]
2249    async fn share_room_key(&self) -> Result<()> {
2250        self.ensure_room_joined()?;
2251
2252        let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2253
2254        for request in requests {
2255            let response = self.client.send_to_device(&request).await?;
2256            self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2257        }
2258
2259        Ok(())
2260    }
2261
2262    /// Wait for the room to be fully synced.
2263    ///
2264    /// This method makes sure the room that was returned when joining a room
2265    /// has been echoed back in the sync.
2266    ///
2267    /// Warning: This waits until a sync happens and does not return if no sync
2268    /// is happening. It can also return early when the room is not a joined
2269    /// room anymore.
2270    #[instrument(skip_all)]
2271    pub async fn sync_up(&self) {
2272        while !self.is_synced() && self.state() == RoomState::Joined {
2273            let wait_for_beat = self.client.inner.sync_beat.listen();
2274            // We don't care whether it's a timeout or a sync beat.
2275            let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2276        }
2277    }
2278
2279    /// Send a message-like event to this room.
2280    ///
2281    /// Returns the parsed response from the server.
2282    ///
2283    /// If the encryption feature is enabled this method will transparently
2284    /// encrypt the event if this room is encrypted (except for `m.reaction`
2285    /// events, which are never encrypted).
2286    ///
2287    /// **Note**: If you just want to send an event with custom JSON content to
2288    /// a room, you can use the [`send_raw()`][Self::send_raw] method for that.
2289    ///
2290    /// If you want to set a transaction ID for the event, use
2291    /// [`.with_transaction_id()`][SendMessageLikeEvent::with_transaction_id]
2292    /// on the returned value before `.await`ing it.
2293    ///
2294    /// # Arguments
2295    ///
2296    /// * `content` - The content of the message event.
2297    ///
2298    /// # Examples
2299    ///
2300    /// ```no_run
2301    /// # use std::sync::{Arc, RwLock};
2302    /// # use matrix_sdk::{Client, config::SyncSettings};
2303    /// # use url::Url;
2304    /// # use matrix_sdk::ruma::room_id;
2305    /// # use serde::{Deserialize, Serialize};
2306    /// use matrix_sdk::ruma::{
2307    ///     events::{
2308    ///         macros::EventContent,
2309    ///         room::message::{RoomMessageEventContent, TextMessageEventContent},
2310    ///     },
2311    ///     uint, MilliSecondsSinceUnixEpoch, TransactionId,
2312    /// };
2313    ///
2314    /// # async {
2315    /// # let homeserver = Url::parse("http://localhost:8080")?;
2316    /// # let mut client = Client::new(homeserver).await?;
2317    /// # let room_id = room_id!("!test:localhost");
2318    /// let content = RoomMessageEventContent::text_plain("Hello world");
2319    /// let txn_id = TransactionId::new();
2320    ///
2321    /// if let Some(room) = client.get_room(&room_id) {
2322    ///     room.send(content).with_transaction_id(txn_id).await?;
2323    /// }
2324    ///
2325    /// // Custom events work too:
2326    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2327    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
2328    /// struct TokenEventContent {
2329    ///     token: String,
2330    ///     #[serde(rename = "exp")]
2331    ///     expires_at: MilliSecondsSinceUnixEpoch,
2332    /// }
2333    ///
2334    /// # fn generate_token() -> String { todo!() }
2335    /// let content = TokenEventContent {
2336    ///     token: generate_token(),
2337    ///     expires_at: {
2338    ///         let now = MilliSecondsSinceUnixEpoch::now();
2339    ///         MilliSecondsSinceUnixEpoch(now.0 + uint!(30_000))
2340    ///     },
2341    /// };
2342    ///
2343    /// if let Some(room) = client.get_room(&room_id) {
2344    ///     room.send(content).await?;
2345    /// }
2346    /// # anyhow::Ok(()) };
2347    /// ```
2348    pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2349        SendMessageLikeEvent::new(self, content)
2350    }
2351
2352    /// Run /keys/query requests for all the non-tracked users, and for users
2353    /// with an out-of-date device list.
2354    #[cfg(feature = "e2e-encryption")]
2355    async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2356        let olm = self.client.olm_machine().await;
2357        let olm = olm.as_ref().expect("Olm machine wasn't started");
2358
2359        let members =
2360            self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2361
2362        let tracked: HashMap<_, _> = olm
2363            .store()
2364            .load_tracked_users()
2365            .await?
2366            .into_iter()
2367            .map(|tracked| (tracked.user_id, tracked.dirty))
2368            .collect();
2369
2370        // A member has no unknown devices iff it was tracked *and* the tracking is
2371        // not considered dirty.
2372        let members_with_unknown_devices =
2373            members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2374
2375        let (req_id, request) =
2376            olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2377
2378        if !request.device_keys.is_empty() {
2379            self.client.keys_query(&req_id, request.device_keys).await?;
2380        }
2381
2382        Ok(())
2383    }
2384
2385    /// Send a message-like event with custom JSON content to this room.
2386    ///
2387    /// Returns the parsed response from the server.
2388    ///
2389    /// If the encryption feature is enabled this method will transparently
2390    /// encrypt the event if this room is encrypted (except for `m.reaction`
2391    /// events, which are never encrypted).
2392    ///
2393    /// This method is equivalent to the [`send()`][Self::send] method but
2394    /// allows sending custom JSON payloads, e.g. constructed using the
2395    /// [`serde_json::json!()`] macro.
2396    ///
2397    /// If you want to set a transaction ID for the event, use
2398    /// [`.with_transaction_id()`][SendRawMessageLikeEvent::with_transaction_id]
2399    /// on the returned value before `.await`ing it.
2400    ///
2401    /// # Arguments
2402    ///
2403    /// * `event_type` - The type of the event.
2404    ///
2405    /// * `content` - The content of the event as a raw JSON value. The argument
2406    ///   type can be `serde_json::Value`, but also other raw JSON types; for
2407    ///   the full list check the documentation of
2408    ///   [`IntoRawMessageLikeEventContent`].
2409    ///
2410    /// # Examples
2411    ///
2412    /// ```no_run
2413    /// # use std::sync::{Arc, RwLock};
2414    /// # use matrix_sdk::{Client, config::SyncSettings};
2415    /// # use url::Url;
2416    /// # use matrix_sdk::ruma::room_id;
2417    /// # async {
2418    /// # let homeserver = Url::parse("http://localhost:8080")?;
2419    /// # let mut client = Client::new(homeserver).await?;
2420    /// # let room_id = room_id!("!test:localhost");
2421    /// use serde_json::json;
2422    ///
2423    /// if let Some(room) = client.get_room(&room_id) {
2424    ///     room.send_raw("m.room.message", json!({ "body": "Hello world" })).await?;
2425    /// }
2426    /// # anyhow::Ok(()) };
2427    /// ```
2428    #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2429    pub fn send_raw<'a>(
2430        &'a self,
2431        event_type: &'a str,
2432        content: impl IntoRawMessageLikeEventContent,
2433    ) -> SendRawMessageLikeEvent<'a> {
2434        // Note: the recorded instrument fields are saved in
2435        // `SendRawMessageLikeEvent::into_future`.
2436        SendRawMessageLikeEvent::new(self, event_type, content)
2437    }
2438
2439    /// Send an attachment to this room.
2440    ///
2441    /// This will upload the given data that the reader produces using the
2442    /// [`upload()`] method and post an event to the given room.
2443    /// If the room is encrypted and the encryption feature is enabled the
2444    /// upload will be encrypted.
2445    ///
2446    /// This is a convenience method that calls the
2447    /// [`upload()`] and afterwards the [`send()`].
2448    ///
2449    /// # Arguments
2450    /// * `filename` - The file name.
2451    ///
2452    /// * `content_type` - The type of the media, this will be used as the
2453    /// content-type header.
2454    ///
2455    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2456    /// media.
2457    ///
2458    /// * `config` - Metadata and configuration for the attachment.
2459    ///
2460    /// # Examples
2461    ///
2462    /// ```no_run
2463    /// # use std::fs;
2464    /// # use matrix_sdk::{Client, ruma::room_id, attachment::AttachmentConfig};
2465    /// # use url::Url;
2466    /// # use mime;
2467    /// # async {
2468    /// # let homeserver = Url::parse("http://localhost:8080")?;
2469    /// # let mut client = Client::new(homeserver).await?;
2470    /// # let room_id = room_id!("!test:localhost");
2471    /// let mut image = fs::read("/home/example/my-cat.jpg")?;
2472    ///
2473    /// if let Some(room) = client.get_room(&room_id) {
2474    ///     room.send_attachment(
2475    ///         "my_favorite_cat.jpg",
2476    ///         &mime::IMAGE_JPEG,
2477    ///         image,
2478    ///         AttachmentConfig::new(),
2479    ///     ).await?;
2480    /// }
2481    /// # anyhow::Ok(()) };
2482    /// ```
2483    ///
2484    /// [`upload()`]: crate::Media::upload
2485    /// [`send()`]: Self::send
2486    #[instrument(skip_all)]
2487    pub fn send_attachment<'a>(
2488        &'a self,
2489        filename: impl Into<String>,
2490        content_type: &'a Mime,
2491        data: Vec<u8>,
2492        config: AttachmentConfig,
2493    ) -> SendAttachment<'a> {
2494        SendAttachment::new(self, filename.into(), content_type, data, config)
2495    }
2496
2497    /// Prepare and send an attachment to this room.
2498    ///
2499    /// This will upload the given data that the reader produces using the
2500    /// [`upload()`](#method.upload) method and post an event to the given room.
2501    /// If the room is encrypted and the encryption feature is enabled the
2502    /// upload will be encrypted.
2503    ///
2504    /// This is a convenience method that calls the
2505    /// [`Client::upload()`](#Client::method.upload) and afterwards the
2506    /// [`send()`](#method.send).
2507    ///
2508    /// # Arguments
2509    /// * `filename` - The file name.
2510    ///
2511    /// * `content_type` - The type of the media, this will be used as the
2512    ///   content-type header.
2513    ///
2514    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2515    ///   media.
2516    ///
2517    /// * `config` - Metadata and configuration for the attachment.
2518    ///
2519    /// * `send_progress` - An observable to transmit forward progress about the
2520    ///   upload.
2521    ///
2522    /// * `store_in_cache` - A boolean defining whether the uploaded media will
2523    ///   be stored in the cache immediately after a successful upload.
2524    #[instrument(skip_all)]
2525    pub(super) async fn prepare_and_send_attachment<'a>(
2526        &'a self,
2527        filename: String,
2528        content_type: &'a Mime,
2529        data: Vec<u8>,
2530        mut config: AttachmentConfig,
2531        send_progress: SharedObservable<TransmissionProgress>,
2532        store_in_cache: bool,
2533    ) -> Result<send_message_event::v3::Response> {
2534        self.ensure_room_joined()?;
2535
2536        let txn_id = config.txn_id.take();
2537        let mentions = config.mentions.take();
2538
2539        let thumbnail = config.thumbnail.take();
2540
2541        // If necessary, store caching data for the thumbnail ahead of time.
2542        let thumbnail_cache_info = if store_in_cache {
2543            thumbnail
2544                .as_ref()
2545                .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2546        } else {
2547            None
2548        };
2549
2550        #[cfg(feature = "e2e-encryption")]
2551        let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2552            self.client
2553                .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2554                .await?
2555        } else {
2556            self.client
2557                .media()
2558                .upload_plain_media_and_thumbnail(
2559                    content_type,
2560                    // TODO: get rid of this clone; wait for Ruma to use `Bytes` or something
2561                    // similar.
2562                    data.clone(),
2563                    thumbnail,
2564                    send_progress,
2565                )
2566                .await?
2567        };
2568
2569        #[cfg(not(feature = "e2e-encryption"))]
2570        let (media_source, thumbnail) = self
2571            .client
2572            .media()
2573            .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2574            .await?;
2575
2576        if store_in_cache {
2577            let cache_store_lock_guard = self.client.event_cache_store().lock().await?;
2578
2579            // A failure to cache shouldn't prevent the whole upload from finishing
2580            // properly, so only log errors during caching.
2581
2582            debug!("caching the media");
2583            let request =
2584                MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2585
2586            if let Err(err) = cache_store_lock_guard
2587                .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2588                .await
2589            {
2590                warn!("unable to cache the media after uploading it: {err}");
2591            }
2592
2593            if let Some(((data, height, width), source)) =
2594                thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2595            {
2596                debug!("caching the thumbnail");
2597
2598                let request = MediaRequestParameters {
2599                    source: source.clone(),
2600                    format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2601                };
2602
2603                if let Err(err) = cache_store_lock_guard
2604                    .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2605                    .await
2606                {
2607                    warn!("unable to cache the media after uploading it: {err}");
2608                }
2609            }
2610        }
2611
2612        let content = self
2613            .make_media_event(
2614                Room::make_attachment_type(
2615                    content_type,
2616                    filename,
2617                    media_source,
2618                    config.caption,
2619                    config.formatted_caption,
2620                    config.info,
2621                    thumbnail,
2622                ),
2623                mentions,
2624                config.reply,
2625            )
2626            .await?;
2627
2628        let mut fut = self.send(content);
2629        if let Some(txn_id) = txn_id {
2630            fut = fut.with_transaction_id(txn_id);
2631        }
2632        fut.await
2633    }
2634
2635    /// Creates the inner [`MessageType`] for an already-uploaded media file
2636    /// provided by its source.
2637    #[allow(clippy::too_many_arguments)]
2638    pub(crate) fn make_attachment_type(
2639        content_type: &Mime,
2640        filename: String,
2641        source: MediaSource,
2642        caption: Option<String>,
2643        formatted_caption: Option<FormattedBody>,
2644        info: Option<AttachmentInfo>,
2645        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2646    ) -> MessageType {
2647        make_media_type!(
2648            MessageType,
2649            content_type,
2650            filename,
2651            source,
2652            caption,
2653            formatted_caption,
2654            info,
2655            thumbnail
2656        )
2657    }
2658
2659    /// Creates the [`RoomMessageEventContent`] based on the message type,
2660    /// mentions and reply information.
2661    pub(crate) async fn make_media_event(
2662        &self,
2663        msg_type: MessageType,
2664        mentions: Option<Mentions>,
2665        reply: Option<Reply>,
2666    ) -> Result<RoomMessageEventContent> {
2667        let mut content = RoomMessageEventContent::new(msg_type);
2668        if let Some(mentions) = mentions {
2669            content = content.add_mentions(mentions);
2670        }
2671        if let Some(reply) = reply {
2672            // Since we just created the event, there is no relation attached to it. Thus,
2673            // it is safe to add the reply relation without overriding anything.
2674            content = self.make_reply_event(content.into(), reply).await?;
2675        }
2676        Ok(content)
2677    }
2678
2679    /// Creates the inner [`GalleryItemType`] for an already-uploaded media file
2680    /// provided by its source.
2681    #[cfg(feature = "unstable-msc4274")]
2682    #[allow(clippy::too_many_arguments)]
2683    pub(crate) fn make_gallery_item_type(
2684        content_type: &Mime,
2685        filename: String,
2686        source: MediaSource,
2687        caption: Option<String>,
2688        formatted_caption: Option<FormattedBody>,
2689        info: Option<AttachmentInfo>,
2690        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2691    ) -> GalleryItemType {
2692        make_media_type!(
2693            GalleryItemType,
2694            content_type,
2695            filename,
2696            source,
2697            caption,
2698            formatted_caption,
2699            info,
2700            thumbnail
2701        )
2702    }
2703
2704    /// Update the power levels of a select set of users of this room.
2705    ///
2706    /// Issue a `power_levels` state event request to the server, changing the
2707    /// given UserId -> Int levels. May fail if the `power_levels` aren't
2708    /// locally known yet or the server rejects the state event update, e.g.
2709    /// because of insufficient permissions. Neither permissions to update
2710    /// nor whether the data might be stale is checked prior to issuing the
2711    /// request.
2712    pub async fn update_power_levels(
2713        &self,
2714        updates: Vec<(&UserId, Int)>,
2715    ) -> Result<send_state_event::v3::Response> {
2716        let mut power_levels = self.power_levels().await?;
2717
2718        for (user_id, new_level) in updates {
2719            if new_level == power_levels.users_default {
2720                power_levels.users.remove(user_id);
2721            } else {
2722                power_levels.users.insert(user_id.to_owned(), new_level);
2723            }
2724        }
2725
2726        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2727    }
2728
2729    /// Applies a set of power level changes to this room.
2730    ///
2731    /// Any values that are `None` in the given `RoomPowerLevelChanges` will
2732    /// remain unchanged.
2733    pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2734        let mut power_levels = self.power_levels().await?;
2735        power_levels.apply(changes)?;
2736        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2737        Ok(())
2738    }
2739
2740    /// Resets the room's power levels to the default values
2741    ///
2742    /// [spec]: https://spec.matrix.org/v1.9/client-server-api/#mroompower_levels
2743    pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2744        let creators = self.creators().unwrap_or_default();
2745        let rules = self.clone_info().room_version_rules_or_default();
2746
2747        let default_power_levels =
2748            RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2749        let changes = RoomPowerLevelChanges::from(default_power_levels);
2750        self.apply_power_level_changes(changes).await?;
2751        Ok(self.power_levels().await?)
2752    }
2753
2754    /// Gets the suggested role for the user with the provided `user_id`.
2755    ///
2756    /// This method checks the `RoomPowerLevels` events instead of loading the
2757    /// member list and looking for the member.
2758    pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2759        let power_level = self.get_user_power_level(user_id).await?;
2760        Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2761    }
2762
2763    /// Gets the power level the user with the provided `user_id`.
2764    ///
2765    /// This method checks the `RoomPowerLevels` events instead of loading the
2766    /// member list and looking for the member.
2767    pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2768        let event = self.power_levels().await?;
2769        Ok(event.for_user(user_id))
2770    }
2771
2772    /// Gets a map with the `UserId` of users with power levels other than `0`
2773    /// and this power level.
2774    pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2775        let power_levels = self.power_levels().await.ok();
2776        let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2777        if let Some(power_levels) = power_levels {
2778            for (id, level) in power_levels.users.into_iter() {
2779                user_power_levels.insert(id, level.into());
2780            }
2781        }
2782        user_power_levels
2783    }
2784
2785    /// Sets the name of this room.
2786    pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2787        self.send_state_event(RoomNameEventContent::new(name)).await
2788    }
2789
2790    /// Sets a new topic for this room.
2791    pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2792        self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2793    }
2794
2795    /// Sets the new avatar url for this room.
2796    ///
2797    /// # Arguments
2798    /// * `avatar_url` - The owned Matrix uri that represents the avatar
2799    /// * `info` - The optional image info that can be provided for the avatar
2800    pub async fn set_avatar_url(
2801        &self,
2802        url: &MxcUri,
2803        info: Option<avatar::ImageInfo>,
2804    ) -> Result<send_state_event::v3::Response> {
2805        self.ensure_room_joined()?;
2806
2807        let mut room_avatar_event = RoomAvatarEventContent::new();
2808        room_avatar_event.url = Some(url.to_owned());
2809        room_avatar_event.info = info.map(Box::new);
2810
2811        self.send_state_event(room_avatar_event).await
2812    }
2813
2814    /// Removes the avatar from the room
2815    pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2816        self.send_state_event(RoomAvatarEventContent::new()).await
2817    }
2818
2819    /// Uploads a new avatar for this room.
2820    ///
2821    /// # Arguments
2822    /// * `mime` - The mime type describing the data
2823    /// * `data` - The data representation of the avatar
2824    /// * `info` - The optional image info provided for the avatar, the blurhash
2825    ///   and the mimetype will always be updated
2826    pub async fn upload_avatar(
2827        &self,
2828        mime: &Mime,
2829        data: Vec<u8>,
2830        info: Option<avatar::ImageInfo>,
2831    ) -> Result<send_state_event::v3::Response> {
2832        self.ensure_room_joined()?;
2833
2834        let upload_response = self.client.media().upload(mime, data, None).await?;
2835        let mut info = info.unwrap_or_default();
2836        info.blurhash = upload_response.blurhash;
2837        info.mimetype = Some(mime.to_string());
2838
2839        self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2840    }
2841
2842    /// Send a state event with an empty state key to the homeserver.
2843    ///
2844    /// For state events with a non-empty state key, see
2845    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
2846    ///
2847    /// Returns the parsed response from the server.
2848    ///
2849    /// # Arguments
2850    ///
2851    /// * `content` - The content of the state event.
2852    ///
2853    /// # Examples
2854    ///
2855    /// ```no_run
2856    /// # use serde::{Deserialize, Serialize};
2857    /// # async {
2858    /// # let joined_room: matrix_sdk::Room = todo!();
2859    /// use matrix_sdk::ruma::{
2860    ///     events::{
2861    ///         macros::EventContent, room::encryption::RoomEncryptionEventContent,
2862    ///         EmptyStateKey,
2863    ///     },
2864    ///     EventEncryptionAlgorithm,
2865    /// };
2866    ///
2867    /// let encryption_event_content = RoomEncryptionEventContent::new(
2868    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
2869    /// );
2870    /// joined_room.send_state_event(encryption_event_content).await?;
2871    ///
2872    /// // Custom event:
2873    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2874    /// #[ruma_event(
2875    ///     type = "org.matrix.msc_9000.xxx",
2876    ///     kind = State,
2877    ///     state_key_type = EmptyStateKey,
2878    /// )]
2879    /// struct XxxStateEventContent {/* fields... */}
2880    ///
2881    /// let content: XxxStateEventContent = todo!();
2882    /// joined_room.send_state_event(content).await?;
2883    /// # anyhow::Ok(()) };
2884    /// ```
2885    #[cfg(not(feature = "experimental-encrypted-state-events"))]
2886    #[instrument(skip_all)]
2887    pub async fn send_state_event(
2888        &self,
2889        content: impl StateEventContent<StateKey = EmptyStateKey>,
2890    ) -> Result<send_state_event::v3::Response> {
2891        self.send_state_event_for_key(&EmptyStateKey, content).await
2892    }
2893
2894    /// Send a state event with an empty state key to the homeserver.
2895    ///
2896    /// For state events with a non-empty state key, see
2897    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
2898    ///
2899    /// If the experimental state event encryption feature is enabled, this
2900    /// method will transparently encrypt the event if this room is
2901    /// encrypted (except if the event type is considered critical for the room
2902    /// to function, as outlined in [MSC3414][msc3414]).
2903    ///
2904    /// Returns the parsed response from the server.
2905    ///
2906    /// # Arguments
2907    ///
2908    /// * `content` - The content of the state event.
2909    ///
2910    /// # Examples
2911    ///
2912    /// ```no_run
2913    /// # use serde::{Deserialize, Serialize};
2914    /// # async {
2915    /// # let joined_room: matrix_sdk::Room = todo!();
2916    /// use matrix_sdk::ruma::{
2917    ///     events::{
2918    ///         macros::EventContent, room::encryption::RoomEncryptionEventContent,
2919    ///         EmptyStateKey,
2920    ///     },
2921    ///     EventEncryptionAlgorithm,
2922    /// };
2923    ///
2924    /// let encryption_event_content = RoomEncryptionEventContent::new(
2925    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
2926    /// );
2927    /// joined_room.send_state_event(encryption_event_content).await?;
2928    ///
2929    /// // Custom event:
2930    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2931    /// #[ruma_event(
2932    ///     type = "org.matrix.msc_9000.xxx",
2933    ///     kind = State,
2934    ///     state_key_type = EmptyStateKey,
2935    /// )]
2936    /// struct XxxStateEventContent {/* fields... */}
2937    ///
2938    /// let content: XxxStateEventContent = todo!();
2939    /// joined_room.send_state_event(content).await?;
2940    /// # anyhow::Ok(()) };
2941    /// ```
2942    ///
2943    /// [msc3414]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/3414-encrypted-state.md
2944    #[cfg(feature = "experimental-encrypted-state-events")]
2945    #[instrument(skip_all)]
2946    pub fn send_state_event<'a>(
2947        &'a self,
2948        content: impl StateEventContent<StateKey = EmptyStateKey>,
2949    ) -> SendStateEvent<'a> {
2950        self.send_state_event_for_key(&EmptyStateKey, content)
2951    }
2952
2953    /// Send a state event to the homeserver.
2954    ///
2955    /// Returns the parsed response from the server.
2956    ///
2957    /// # Arguments
2958    ///
2959    /// * `content` - The content of the state event.
2960    ///
2961    /// * `state_key` - A unique key which defines the overwriting semantics for
2962    ///   this piece of room state.
2963    ///
2964    /// # Examples
2965    ///
2966    /// ```no_run
2967    /// # use serde::{Deserialize, Serialize};
2968    /// # async {
2969    /// # let joined_room: matrix_sdk::Room = todo!();
2970    /// use matrix_sdk::ruma::{
2971    ///     events::{
2972    ///         macros::EventContent,
2973    ///         room::member::{RoomMemberEventContent, MembershipState},
2974    ///     },
2975    ///     mxc_uri,
2976    /// };
2977    ///
2978    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
2979    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
2980    /// content.avatar_url = Some(avatar_url);
2981    ///
2982    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
2983    ///
2984    /// // Custom event:
2985    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2986    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
2987    /// struct XxxStateEventContent { /* fields... */ }
2988    ///
2989    /// let content: XxxStateEventContent = todo!();
2990    /// joined_room.send_state_event_for_key("foo", content).await?;
2991    /// # anyhow::Ok(()) };
2992    /// ```
2993    #[cfg(not(feature = "experimental-encrypted-state-events"))]
2994    pub async fn send_state_event_for_key<C, K>(
2995        &self,
2996        state_key: &K,
2997        content: C,
2998    ) -> Result<send_state_event::v3::Response>
2999    where
3000        C: StateEventContent,
3001        C::StateKey: Borrow<K>,
3002        K: AsRef<str> + ?Sized,
3003    {
3004        self.ensure_room_joined()?;
3005        let request =
3006            send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3007        let response = self.client.send(request).await?;
3008        Ok(response)
3009    }
3010
3011    /// Send a state event to the homeserver. If state encryption is enabled in
3012    /// this room, the event will be encrypted.
3013    ///
3014    /// If the experimental state event encryption feature is enabled, this
3015    /// method will transparently encrypt the event if this room is
3016    /// encrypted (except if the event type is considered critical for the room
3017    /// to function, as outlined in [MSC3414][msc3414]).
3018    ///
3019    /// Returns the parsed response from the server.
3020    ///
3021    /// # Arguments
3022    ///
3023    /// * `content` - The content of the state event.
3024    ///
3025    /// * `state_key` - A unique key which defines the overwriting semantics for
3026    ///   this piece of room state.
3027    ///
3028    /// # Examples
3029    ///
3030    /// ```no_run
3031    /// # use serde::{Deserialize, Serialize};
3032    /// # async {
3033    /// # let joined_room: matrix_sdk::Room = todo!();
3034    /// use matrix_sdk::ruma::{
3035    ///     events::{
3036    ///         macros::EventContent,
3037    ///         room::member::{RoomMemberEventContent, MembershipState},
3038    ///     },
3039    ///     mxc_uri,
3040    /// };
3041    ///
3042    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3043    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3044    /// content.avatar_url = Some(avatar_url);
3045    ///
3046    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3047    ///
3048    /// // Custom event:
3049    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3050    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3051    /// struct XxxStateEventContent { /* fields... */ }
3052    ///
3053    /// let content: XxxStateEventContent = todo!();
3054    /// joined_room.send_state_event_for_key("foo", content).await?;
3055    /// # anyhow::Ok(()) };
3056    /// ```
3057    ///
3058    /// [msc3414]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/3414-encrypted-state.md
3059    #[cfg(feature = "experimental-encrypted-state-events")]
3060    pub fn send_state_event_for_key<'a, C, K>(
3061        &'a self,
3062        state_key: &K,
3063        content: C,
3064    ) -> SendStateEvent<'a>
3065    where
3066        C: StateEventContent,
3067        C::StateKey: Borrow<K>,
3068        K: AsRef<str> + ?Sized,
3069    {
3070        SendStateEvent::new(self, state_key, content)
3071    }
3072
3073    /// Send a raw room state event to the homeserver.
3074    ///
3075    /// Returns the parsed response from the server.
3076    ///
3077    /// # Arguments
3078    ///
3079    /// * `event_type` - The type of the event that we're sending out.
3080    ///
3081    /// * `state_key` - A unique key which defines the overwriting semantics for
3082    /// this piece of room state. This value is often a zero-length string.
3083    ///
3084    /// * `content` - The content of the event as a raw JSON value. The argument
3085    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3086    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3087    ///
3088    /// # Examples
3089    ///
3090    /// ```no_run
3091    /// use serde_json::json;
3092    ///
3093    /// # async {
3094    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3095    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3096    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3097    ///
3098    /// if let Some(room) = client.get_room(&room_id) {
3099    ///     room.send_state_event_raw("m.room.member", "", json!({
3100    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3101    ///         "displayname": "Alice Margatroid",
3102    ///         "membership": "join",
3103    ///     })).await?;
3104    /// }
3105    /// # anyhow::Ok(()) };
3106    /// ```
3107    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3108    #[instrument(skip_all)]
3109    pub async fn send_state_event_raw(
3110        &self,
3111        event_type: &str,
3112        state_key: &str,
3113        content: impl IntoRawStateEventContent,
3114    ) -> Result<send_state_event::v3::Response> {
3115        self.ensure_room_joined()?;
3116
3117        let request = send_state_event::v3::Request::new_raw(
3118            self.room_id().to_owned(),
3119            event_type.into(),
3120            state_key.to_owned(),
3121            content.into_raw_state_event_content(),
3122        );
3123
3124        Ok(self.client.send(request).await?)
3125    }
3126
3127    /// Send a raw room state event to the homeserver.
3128    ///
3129    /// If the experimental state event encryption feature is enabled, this
3130    /// method will transparently encrypt the event if this room is
3131    /// encrypted (except if the event type is considered critical for the room
3132    /// to function, as outlined in [MSC3414][msc3414]).
3133    ///
3134    /// Returns the parsed response from the server.
3135    ///
3136    /// # Arguments
3137    ///
3138    /// * `event_type` - The type of the event that we're sending out.
3139    ///
3140    /// * `state_key` - A unique key which defines the overwriting semantics for
3141    /// this piece of room state. This value is often a zero-length string.
3142    ///
3143    /// * `content` - The content of the event as a raw JSON value. The argument
3144    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3145    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3146    ///
3147    /// # Examples
3148    ///
3149    /// ```no_run
3150    /// use serde_json::json;
3151    ///
3152    /// # async {
3153    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3154    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3155    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3156    ///
3157    /// if let Some(room) = client.get_room(&room_id) {
3158    ///     room.send_state_event_raw("m.room.member", "", json!({
3159    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3160    ///         "displayname": "Alice Margatroid",
3161    ///         "membership": "join",
3162    ///     })).await?;
3163    /// }
3164    /// # anyhow::Ok(()) };
3165    /// ```
3166    ///
3167    /// [msc3414]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/3414-encrypted-state.md
3168    #[cfg(feature = "experimental-encrypted-state-events")]
3169    #[instrument(skip_all)]
3170    pub fn send_state_event_raw<'a>(
3171        &'a self,
3172        event_type: &'a str,
3173        state_key: &'a str,
3174        content: impl IntoRawStateEventContent,
3175    ) -> SendRawStateEvent<'a> {
3176        SendRawStateEvent::new(self, event_type, state_key, content)
3177    }
3178
3179    /// Strips all information out of an event of the room.
3180    ///
3181    /// Returns the [`redact_event::v3::Response`] from the server.
3182    ///
3183    /// This cannot be undone. Users may redact their own events, and any user
3184    /// with a power level greater than or equal to the redact power level of
3185    /// the room may redact events there.
3186    ///
3187    /// # Arguments
3188    ///
3189    /// * `event_id` - The ID of the event to redact
3190    ///
3191    /// * `reason` - The reason for the event being redacted.
3192    ///
3193    /// * `txn_id` - A unique ID that can be attached to this event as
3194    /// its transaction ID. If not given one is created for the message.
3195    ///
3196    /// # Examples
3197    ///
3198    /// ```no_run
3199    /// use matrix_sdk::ruma::event_id;
3200    ///
3201    /// # async {
3202    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3203    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3204    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3205    /// #
3206    /// if let Some(room) = client.get_room(&room_id) {
3207    ///     let event_id = event_id!("$xxxxxx:example.org");
3208    ///     let reason = Some("Indecent material");
3209    ///     room.redact(&event_id, reason, None).await?;
3210    /// }
3211    /// # anyhow::Ok(()) };
3212    /// ```
3213    #[instrument(skip_all)]
3214    pub async fn redact(
3215        &self,
3216        event_id: &EventId,
3217        reason: Option<&str>,
3218        txn_id: Option<OwnedTransactionId>,
3219    ) -> HttpResult<redact_event::v3::Response> {
3220        let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3221        let request = assign!(
3222            redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3223            { reason: reason.map(ToOwned::to_owned) }
3224        );
3225
3226        self.client.send(request).await
3227    }
3228
3229    /// Get a list of servers that should know this room.
3230    ///
3231    /// Uses the synced members of the room and the suggested [routing
3232    /// algorithm] from the Matrix spec.
3233    ///
3234    /// Returns at most three servers.
3235    ///
3236    /// [routing algorithm]: https://spec.matrix.org/v1.3/appendices/#routing
3237    pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3238        let acl_ev = self
3239            .get_state_event_static::<RoomServerAclEventContent>()
3240            .await?
3241            .and_then(|ev| ev.deserialize().ok());
3242        let acl = acl_ev.as_ref().and_then(|ev| match ev {
3243            SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3244            SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3245        });
3246
3247        // Filter out server names that:
3248        // - Are blocked due to server ACLs
3249        // - Are IP addresses
3250        let members: Vec<_> = self
3251            .members_no_sync(RoomMemberships::JOIN)
3252            .await?
3253            .into_iter()
3254            .filter(|member| {
3255                let server = member.user_id().server_name();
3256                acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3257            })
3258            .collect();
3259
3260        // Get the server of the highest power level user in the room, provided
3261        // they are at least power level 50.
3262        let max = members
3263            .iter()
3264            .max_by_key(|member| member.power_level())
3265            .filter(|max| max.power_level() >= int!(50))
3266            .map(|member| member.user_id().server_name());
3267
3268        // Sort the servers by population.
3269        let servers = members
3270            .iter()
3271            .map(|member| member.user_id().server_name())
3272            .filter(|server| max.filter(|max| max == server).is_none())
3273            .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3274                *servers.entry(server).or_default() += 1;
3275                servers
3276            });
3277        let mut servers: Vec<_> = servers.into_iter().collect();
3278        servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3279
3280        Ok(max
3281            .into_iter()
3282            .chain(servers.into_iter().map(|(name, _)| name))
3283            .take(3)
3284            .map(ToOwned::to_owned)
3285            .collect())
3286    }
3287
3288    /// Get a `matrix.to` permalink to this room.
3289    ///
3290    /// If this room has an alias, we use it. Otherwise, we try to use the
3291    /// synced members in the room for [routing] the room ID.
3292    ///
3293    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3294    pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3295        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3296            return Ok(alias.matrix_to_uri());
3297        }
3298
3299        let via = self.route().await?;
3300        Ok(self.room_id().matrix_to_uri_via(via))
3301    }
3302
3303    /// Get a `matrix:` permalink to this room.
3304    ///
3305    /// If this room has an alias, we use it. Otherwise, we try to use the
3306    /// synced members in the room for [routing] the room ID.
3307    ///
3308    /// # Arguments
3309    ///
3310    /// * `join` - Whether the user should join the room.
3311    ///
3312    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3313    pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3314        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3315            return Ok(alias.matrix_uri(join));
3316        }
3317
3318        let via = self.route().await?;
3319        Ok(self.room_id().matrix_uri_via(via, join))
3320    }
3321
3322    /// Get a `matrix.to` permalink to an event in this room.
3323    ///
3324    /// We try to use the synced members in the room for [routing] the room ID.
3325    ///
3326    /// *Note*: This method does not check if the given event ID is actually
3327    /// part of this room. It needs to be checked before calling this method
3328    /// otherwise the permalink won't work.
3329    ///
3330    /// # Arguments
3331    ///
3332    /// * `event_id` - The ID of the event.
3333    ///
3334    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3335    pub async fn matrix_to_event_permalink(
3336        &self,
3337        event_id: impl Into<OwnedEventId>,
3338    ) -> Result<MatrixToUri> {
3339        // Don't use the alias because an event is tied to a room ID, but an
3340        // alias might point to another room, e.g. after a room upgrade.
3341        let via = self.route().await?;
3342        Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3343    }
3344
3345    /// Get a `matrix:` permalink to an event in this room.
3346    ///
3347    /// We try to use the synced members in the room for [routing] the room ID.
3348    ///
3349    /// *Note*: This method does not check if the given event ID is actually
3350    /// part of this room. It needs to be checked before calling this method
3351    /// otherwise the permalink won't work.
3352    ///
3353    /// # Arguments
3354    ///
3355    /// * `event_id` - The ID of the event.
3356    ///
3357    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3358    pub async fn matrix_event_permalink(
3359        &self,
3360        event_id: impl Into<OwnedEventId>,
3361    ) -> Result<MatrixUri> {
3362        // Don't use the alias because an event is tied to a room ID, but an
3363        // alias might point to another room, e.g. after a room upgrade.
3364        let via = self.route().await?;
3365        Ok(self.room_id().matrix_event_uri_via(event_id, via))
3366    }
3367
3368    /// Get the latest receipt of a user in this room.
3369    ///
3370    /// # Arguments
3371    ///
3372    /// * `receipt_type` - The type of receipt to get.
3373    ///
3374    /// * `thread` - The thread containing the event of the receipt, if any.
3375    ///
3376    /// * `user_id` - The ID of the user.
3377    ///
3378    /// Returns the ID of the event on which the receipt applies and the
3379    /// receipt.
3380    pub async fn load_user_receipt(
3381        &self,
3382        receipt_type: ReceiptType,
3383        thread: ReceiptThread,
3384        user_id: &UserId,
3385    ) -> Result<Option<(OwnedEventId, Receipt)>> {
3386        self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3387    }
3388
3389    /// Load the receipts for an event in this room from storage.
3390    ///
3391    /// # Arguments
3392    ///
3393    /// * `receipt_type` - The type of receipt to get.
3394    ///
3395    /// * `thread` - The thread containing the event of the receipt, if any.
3396    ///
3397    /// * `event_id` - The ID of the event.
3398    ///
3399    /// Returns a list of IDs of users who have sent a receipt for the event and
3400    /// the corresponding receipts.
3401    pub async fn load_event_receipts(
3402        &self,
3403        receipt_type: ReceiptType,
3404        thread: ReceiptThread,
3405        event_id: &EventId,
3406    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3407        self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3408    }
3409
3410    /// Get the push-condition context for this room.
3411    ///
3412    /// Returns `None` if some data couldn't be found. This should only happen
3413    /// in brand new rooms, while we process its state.
3414    pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3415        self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3416    }
3417
3418    /// Get the push-condition context for this room, with a choice to include
3419    /// thread subscriptions or not, based on the extra
3420    /// `with_threads_subscriptions` parameter.
3421    ///
3422    /// Returns `None` if some data couldn't be found. This should only happen
3423    /// in brand new rooms, while we process its state.
3424    pub(crate) async fn push_condition_room_ctx_internal(
3425        &self,
3426        with_threads_subscriptions: bool,
3427    ) -> Result<Option<PushConditionRoomCtx>> {
3428        let room_id = self.room_id();
3429        let user_id = self.own_user_id();
3430        let room_info = self.clone_info();
3431        let member_count = room_info.active_members_count();
3432
3433        let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3434            member.name().to_owned()
3435        } else {
3436            return Ok(None);
3437        };
3438
3439        let power_levels = match self.power_levels().await {
3440            Ok(power_levels) => Some(power_levels.into()),
3441            Err(error) => {
3442                if matches!(room_info.state(), RoomState::Joined) {
3443                    // It's normal to not have the power levels in a non-joined room, so don't log
3444                    // the error if the room is not joined
3445                    error!("Could not compute power levels for push conditions: {error}");
3446                }
3447                None
3448            }
3449        };
3450
3451        let mut ctx = assign!(PushConditionRoomCtx::new(
3452            room_id.to_owned(),
3453            UInt::new(member_count).unwrap_or(UInt::MAX),
3454            user_id.to_owned(),
3455            user_display_name,
3456        ),
3457        {
3458            power_levels,
3459        });
3460
3461        if with_threads_subscriptions {
3462            let this = self.clone();
3463            ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3464                let room = this.clone();
3465                Box::pin(async move {
3466                    if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3467                        maybe_sub.is_some()
3468                    } else {
3469                        false
3470                    }
3471                })
3472            });
3473        }
3474
3475        Ok(Some(ctx))
3476    }
3477
3478    /// Retrieves a [`PushContext`] that can be used to compute the push
3479    /// actions for events.
3480    pub async fn push_context(&self) -> Result<Option<PushContext>> {
3481        self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3482    }
3483
3484    /// Retrieves a [`PushContext`] that can be used to compute the push actions
3485    /// for events, with a choice to include thread subscriptions or not,
3486    /// based on the extra `with_threads_subscriptions` parameter.
3487    #[instrument(skip(self))]
3488    pub(crate) async fn push_context_internal(
3489        &self,
3490        with_threads_subscriptions: bool,
3491    ) -> Result<Option<PushContext>> {
3492        let Some(push_condition_room_ctx) =
3493            self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3494        else {
3495            debug!("Could not aggregate push context");
3496            return Ok(None);
3497        };
3498        let push_rules = self.client().account().push_rules().await?;
3499        Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3500    }
3501
3502    /// Get the push actions for the given event with the current room state.
3503    ///
3504    /// Note that it is possible that no push action is returned because the
3505    /// current room state does not have all the required state events.
3506    pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3507        if let Some(ctx) = self.push_context().await? {
3508            Ok(Some(ctx.for_event(event).await))
3509        } else {
3510            Ok(None)
3511        }
3512    }
3513
3514    /// The membership details of the (latest) invite for the logged-in user in
3515    /// this room.
3516    pub async fn invite_details(&self) -> Result<Invite> {
3517        let state = self.state();
3518
3519        if state != RoomState::Invited {
3520            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3521        }
3522
3523        let invitee = self
3524            .get_member_no_sync(self.own_user_id())
3525            .await?
3526            .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3527        let event = invitee.event();
3528        let inviter_id = event.sender();
3529        let inviter = self.get_member_no_sync(inviter_id).await?;
3530        Ok(Invite { invitee, inviter })
3531    }
3532
3533    /// Get the membership details for the current user.
3534    ///
3535    /// Returns:
3536    ///     - If the user was present in the room, a
3537    ///       [`RoomMemberWithSenderInfo`] containing both the user info and the
3538    ///       member info of the sender of the `m.room.member` event.
3539    ///     - If the current user is not present, an error.
3540    pub async fn member_with_sender_info(
3541        &self,
3542        user_id: &UserId,
3543    ) -> Result<RoomMemberWithSenderInfo> {
3544        let Some(member) = self.get_member_no_sync(user_id).await? else {
3545            return Err(Error::InsufficientData);
3546        };
3547
3548        let sender_member =
3549            if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3550                // If the sender room member info is already available, return it
3551                Some(member)
3552            } else if self.are_members_synced() {
3553                // The room members are synced and we couldn't find the sender info
3554                None
3555            } else if self.sync_members().await.is_ok() {
3556                // Try getting the sender room member info again after syncing
3557                self.get_member_no_sync(member.event().sender()).await?
3558            } else {
3559                None
3560            };
3561
3562        Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3563    }
3564
3565    /// Forget this room.
3566    ///
3567    /// This communicates to the homeserver that it should forget the room.
3568    ///
3569    /// Only left or banned-from rooms can be forgotten.
3570    pub async fn forget(&self) -> Result<()> {
3571        let state = self.state();
3572        match state {
3573            RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3574                return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3575                    "Left / Banned",
3576                    state,
3577                ))));
3578            }
3579            RoomState::Left | RoomState::Banned => {}
3580        }
3581
3582        let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3583        let _response = self.client.send(request).await?;
3584
3585        // If it was a DM, remove the room from the `m.direct` global account data.
3586        if self.inner.direct_targets_length() != 0 {
3587            if let Err(e) = self.set_is_direct(false).await {
3588                // It is not important whether we managed to remove the room, it will not have
3589                // any consequences, so just log the error.
3590                warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3591            }
3592        }
3593
3594        self.client.base_client().forget_room(self.inner.room_id()).await?;
3595
3596        Ok(())
3597    }
3598
3599    fn ensure_room_joined(&self) -> Result<()> {
3600        let state = self.state();
3601        if state == RoomState::Joined {
3602            Ok(())
3603        } else {
3604            Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3605        }
3606    }
3607
3608    /// Get the notification mode.
3609    pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3610        if !matches!(self.state(), RoomState::Joined) {
3611            return None;
3612        }
3613
3614        let notification_settings = self.client().notification_settings().await;
3615
3616        // Get the user-defined mode if available
3617        let notification_mode =
3618            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3619
3620        if notification_mode.is_some() {
3621            notification_mode
3622        } else if let Ok(is_encrypted) =
3623            self.latest_encryption_state().await.map(|state| state.is_encrypted())
3624        {
3625            // Otherwise, if encrypted status is available, get the default mode for this
3626            // type of room.
3627            // From the point of view of notification settings, a `one-to-one` room is one
3628            // that involves exactly two people.
3629            let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3630            let default_mode = notification_settings
3631                .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3632                .await;
3633            Some(default_mode)
3634        } else {
3635            None
3636        }
3637    }
3638
3639    /// Get the user-defined notification mode.
3640    ///
3641    /// The result is cached for fast and non-async call. To read the cached
3642    /// result, use
3643    /// [`matrix_sdk_base::Room::cached_user_defined_notification_mode`].
3644    //
3645    // Note for maintainers:
3646    //
3647    // The fact the result is cached is an important property. If you change that in
3648    // the future, please review all calls to this method.
3649    pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3650        if !matches!(self.state(), RoomState::Joined) {
3651            return None;
3652        }
3653
3654        let notification_settings = self.client().notification_settings().await;
3655
3656        // Get the user-defined mode if available.
3657        let mode =
3658            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3659
3660        if let Some(mode) = mode {
3661            self.update_cached_user_defined_notification_mode(mode);
3662        }
3663
3664        mode
3665    }
3666
3667    /// Report an event as inappropriate to the homeserver's administrator.
3668    ///
3669    /// # Arguments
3670    ///
3671    /// * `event_id` - The ID of the event to report.
3672    /// * `score` - The score to rate this content.
3673    /// * `reason` - The reason the content is being reported.
3674    ///
3675    /// # Errors
3676    ///
3677    /// Returns an error if the room is not joined or if an error occurs with
3678    /// the request.
3679    pub async fn report_content(
3680        &self,
3681        event_id: OwnedEventId,
3682        score: Option<ReportedContentScore>,
3683        reason: Option<String>,
3684    ) -> Result<report_content::v3::Response> {
3685        let state = self.state();
3686        if state != RoomState::Joined {
3687            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3688        }
3689
3690        let request = report_content::v3::Request::new(
3691            self.inner.room_id().to_owned(),
3692            event_id,
3693            score.map(Into::into),
3694            reason,
3695        );
3696        Ok(self.client.send(request).await?)
3697    }
3698
3699    /// Reports a room as inappropriate to the server.
3700    /// The caller is not required to be joined to the room to report it.
3701    ///
3702    /// # Arguments
3703    ///
3704    /// * `reason` - The reason the room is being reported.
3705    ///
3706    /// # Errors
3707    ///
3708    /// Returns an error if the room is not found or on rate limit
3709    pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3710        let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3711
3712        Ok(self.client.send(request).await?)
3713    }
3714
3715    /// Set a flag on the room to indicate that the user has explicitly marked
3716    /// it as (un)read.
3717    ///
3718    /// This is a no-op if [`BaseRoom::is_marked_unread()`] returns the same
3719    /// value as `unread`.
3720    pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3721        if self.is_marked_unread() == unread {
3722            // The request is not necessary.
3723            return Ok(());
3724        }
3725
3726        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3727
3728        let content = MarkedUnreadEventContent::new(unread);
3729
3730        let request = set_room_account_data::v3::Request::new(
3731            user_id.to_owned(),
3732            self.inner.room_id().to_owned(),
3733            &content,
3734        )?;
3735
3736        self.client.send(request).await?;
3737        Ok(())
3738    }
3739
3740    /// Returns the [`RoomEventCache`] associated to this room, assuming the
3741    /// global [`EventCache`] has been enabled for subscription.
3742    pub async fn event_cache(
3743        &self,
3744    ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3745        self.client.event_cache().for_room(self.room_id()).await
3746    }
3747
3748    /// Get the beacon information event in the room for the `user_id`.
3749    ///
3750    /// # Errors
3751    ///
3752    /// Returns an error if the event is redacted, stripped, not found or could
3753    /// not be deserialized.
3754    pub(crate) async fn get_user_beacon_info(
3755        &self,
3756        user_id: &UserId,
3757    ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3758        let raw_event = self
3759            .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3760            .await?
3761            .ok_or(BeaconError::NotFound)?;
3762
3763        match raw_event.deserialize()? {
3764            SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3765            SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3766            SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3767        }
3768    }
3769
3770    /// Start sharing live location in the room.
3771    ///
3772    /// # Arguments
3773    ///
3774    /// * `duration_millis` - The duration for which the live location is
3775    ///   shared, in milliseconds.
3776    /// * `description` - An optional description for the live location share.
3777    ///
3778    /// # Errors
3779    ///
3780    /// Returns an error if the room is not joined or if the state event could
3781    /// not be sent.
3782    pub async fn start_live_location_share(
3783        &self,
3784        duration_millis: u64,
3785        description: Option<String>,
3786    ) -> Result<send_state_event::v3::Response> {
3787        self.ensure_room_joined()?;
3788
3789        self.send_state_event_for_key(
3790            self.own_user_id(),
3791            BeaconInfoEventContent::new(
3792                description,
3793                Duration::from_millis(duration_millis),
3794                true,
3795                None,
3796            ),
3797        )
3798        .await
3799    }
3800
3801    /// Stop sharing live location in the room.
3802    ///
3803    /// # Errors
3804    ///
3805    /// Returns an error if the room is not joined, if the beacon information
3806    /// is redacted or stripped, or if the state event is not found.
3807    pub async fn stop_live_location_share(
3808        &self,
3809    ) -> Result<send_state_event::v3::Response, BeaconError> {
3810        self.ensure_room_joined()?;
3811
3812        let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3813        beacon_info_event.content.stop();
3814        Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3815    }
3816
3817    /// Send a location beacon event in the current room.
3818    ///
3819    /// # Arguments
3820    ///
3821    /// * `geo_uri` - The geo URI of the location beacon.
3822    ///
3823    /// # Errors
3824    ///
3825    /// Returns an error if the room is not joined, if the beacon information
3826    /// is redacted or stripped, if the location share is no longer live,
3827    /// or if the state event is not found.
3828    pub async fn send_location_beacon(
3829        &self,
3830        geo_uri: String,
3831    ) -> Result<send_message_event::v3::Response, BeaconError> {
3832        self.ensure_room_joined()?;
3833
3834        let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3835
3836        if beacon_info_event.content.is_live() {
3837            let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3838            Ok(self.send(content).await?)
3839        } else {
3840            Err(BeaconError::NotLive)
3841        }
3842    }
3843
3844    /// Store the given `ComposerDraft` in the state store using the current
3845    /// room id and optional thread root id as identifier.
3846    pub async fn save_composer_draft(
3847        &self,
3848        draft: ComposerDraft,
3849        thread_root: Option<&EventId>,
3850    ) -> Result<()> {
3851        self.client
3852            .state_store()
3853            .set_kv_data(
3854                StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
3855                StateStoreDataValue::ComposerDraft(draft),
3856            )
3857            .await?;
3858        Ok(())
3859    }
3860
3861    /// Retrieve the `ComposerDraft` stored in the state store for this room
3862    /// and given thread, if any.
3863    pub async fn load_composer_draft(
3864        &self,
3865        thread_root: Option<&EventId>,
3866    ) -> Result<Option<ComposerDraft>> {
3867        let data = self
3868            .client
3869            .state_store()
3870            .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3871            .await?;
3872        Ok(data.and_then(|d| d.into_composer_draft()))
3873    }
3874
3875    /// Remove the `ComposerDraft` stored in the state store for this room
3876    /// and given thread, if any.
3877    pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
3878        self.client
3879            .state_store()
3880            .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3881            .await?;
3882        Ok(())
3883    }
3884
3885    /// Load pinned state events for a room from the `/state` endpoint in the
3886    /// home server.
3887    pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3888        let response = self
3889            .client
3890            .send(get_state_event_for_key::v3::Request::new(
3891                self.room_id().to_owned(),
3892                StateEventType::RoomPinnedEvents,
3893                "".to_owned(),
3894            ))
3895            .await;
3896
3897        match response {
3898            Ok(response) => Ok(Some(
3899                response
3900                    .into_content()
3901                    .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
3902                    .pinned,
3903            )),
3904            Err(http_error) => match http_error.as_client_api_error() {
3905                Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3906                _ => Err(http_error.into()),
3907            },
3908        }
3909    }
3910
3911    /// Observe live location sharing events for this room.
3912    ///
3913    /// The returned observable will receive the newest event for each sync
3914    /// response that contains an `m.beacon` event.
3915    ///
3916    /// Returns a stream of [`ObservableLiveLocation`] events from other users
3917    /// in the room, excluding the live location events of the room's own user.
3918    pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3919        ObservableLiveLocation::new(&self.client, self.room_id())
3920    }
3921
3922    /// Subscribe to knock requests in this `Room`.
3923    ///
3924    /// The current requests to join the room will be emitted immediately
3925    /// when subscribing.
3926    ///
3927    /// A new set of knock requests will be emitted whenever:
3928    /// - A new member event is received.
3929    /// - A knock request is marked as seen.
3930    /// - A sync is gappy (limited), so room membership information may be
3931    ///   outdated.
3932    ///
3933    /// Returns both a stream of knock requests and a handle for a task that
3934    /// will clean up the seen knock request ids when possible.
3935    pub async fn subscribe_to_knock_requests(
3936        &self,
3937    ) -> Result<(impl Stream<Item = Vec<KnockRequest>>, JoinHandle<()>)> {
3938        let this = Arc::new(self.clone());
3939
3940        let room_member_events_observer =
3941            self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3942
3943        let current_seen_ids = self.get_seen_knock_request_ids().await?;
3944        let mut seen_request_ids_stream = self
3945            .seen_knock_request_ids_map
3946            .subscribe()
3947            .await
3948            .map(|values| values.unwrap_or_default());
3949
3950        let mut room_info_stream = self.subscribe_info();
3951
3952        // Spawn a task that will clean up the seen knock request ids when updated room
3953        // members are received
3954        let clear_seen_ids_handle = spawn({
3955            let this = self.clone();
3956            async move {
3957                let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3958                while member_updates_stream.recv().await.is_ok() {
3959                    // If room members were updated, try to remove outdated seen knock request ids
3960                    if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3961                        warn!("Failed to remove seen knock requests: {err}")
3962                    }
3963                }
3964            }
3965        });
3966
3967        let combined_stream = stream! {
3968            // Emit current requests to join
3969            match this.get_current_join_requests(&current_seen_ids).await {
3970                Ok(initial_requests) => yield initial_requests,
3971                Err(err) => warn!("Failed to get initial requests to join: {err}")
3972            }
3973
3974            let mut requests_stream = room_member_events_observer.subscribe();
3975            let mut seen_ids = current_seen_ids.clone();
3976
3977            loop {
3978                // This is equivalent to a combine stream operation, triggering a new emission
3979                // when any of the branches changes
3980                tokio::select! {
3981                    Some((event, _)) = requests_stream.next() => {
3982                        if let Some(event) = event.as_original() {
3983                            // If we can calculate the membership change, try to emit only when needed
3984                            let emit = if event.prev_content().is_some() {
3985                                matches!(event.membership_change(),
3986                                    MembershipChange::Banned |
3987                                    MembershipChange::Knocked |
3988                                    MembershipChange::KnockAccepted |
3989                                    MembershipChange::KnockDenied |
3990                                    MembershipChange::KnockRetracted
3991                                )
3992                            } else {
3993                                // If we can't calculate the membership change, assume we need to
3994                                // emit updated values
3995                                true
3996                            };
3997
3998                            if emit {
3999                                match this.get_current_join_requests(&seen_ids).await {
4000                                    Ok(requests) => yield requests,
4001                                    Err(err) => {
4002                                        warn!("Failed to get updated knock requests on new member event: {err}")
4003                                    }
4004                                }
4005                            }
4006                        }
4007                    }
4008
4009                    Some(new_seen_ids) = seen_request_ids_stream.next() => {
4010                        // Update the current seen ids
4011                        seen_ids = new_seen_ids;
4012
4013                        // If seen requests have changed we need to recalculate
4014                        // all the knock requests
4015                        match this.get_current_join_requests(&seen_ids).await {
4016                            Ok(requests) => yield requests,
4017                            Err(err) => {
4018                                warn!("Failed to get updated knock requests on seen ids changed: {err}")
4019                            }
4020                        }
4021                    }
4022
4023                    Some(room_info) = room_info_stream.next() => {
4024                        // We need to emit new items when we may have missing room members:
4025                        // this usually happens after a gappy (limited) sync
4026                        if !room_info.are_members_synced() {
4027                            match this.get_current_join_requests(&seen_ids).await {
4028                                Ok(requests) => yield requests,
4029                                Err(err) => {
4030                                    warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4031                                }
4032                            }
4033                        }
4034                    }
4035                    // If the streams in all branches are closed, stop the loop
4036                    else => break,
4037                }
4038            }
4039        };
4040
4041        Ok((combined_stream, clear_seen_ids_handle))
4042    }
4043
4044    async fn get_current_join_requests(
4045        &self,
4046        seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4047    ) -> Result<Vec<KnockRequest>> {
4048        Ok(self
4049            .members(RoomMemberships::KNOCK)
4050            .await?
4051            .into_iter()
4052            .filter_map(|member| {
4053                let event_id = member.event().event_id()?;
4054                Some(KnockRequest::new(
4055                    self,
4056                    event_id,
4057                    member.event().timestamp(),
4058                    KnockRequestMemberInfo::from_member(&member),
4059                    seen_request_ids.contains_key(event_id),
4060                ))
4061            })
4062            .collect())
4063    }
4064
4065    /// Access the room settings related to privacy and visibility.
4066    pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4067        RoomPrivacySettings::new(&self.inner, &self.client)
4068    }
4069
4070    /// Retrieve a list of all the threads for the current room.
4071    ///
4072    /// Since this client-server API is paginated, the return type may include a
4073    /// token used to resuming back-pagination into the list of results, in
4074    /// [`ThreadRoots::prev_batch_token`]. This token can be fed back into
4075    /// [`ListThreadsOptions::from`] to continue the pagination
4076    /// from the previous position.
4077    pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4078        let request = opts.into_request(self.room_id());
4079
4080        let response = self.client.send(request).await?;
4081
4082        let push_ctx = self.push_context().await?;
4083        let chunk = join_all(
4084            response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4085        )
4086        .await;
4087
4088        Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4089    }
4090
4091    /// Retrieve a list of relations for the given event, according to the given
4092    /// options.
4093    ///
4094    /// Since this client-server API is paginated, the return type may include a
4095    /// token used to resuming back-pagination into the list of results, in
4096    /// [`Relations::prev_batch_token`]. This token can be fed back into
4097    /// [`RelationsOptions::from`] to continue the pagination from the previous
4098    /// position.
4099    ///
4100    /// **Note**: if [`RelationsOptions::from`] is set for a subsequent request,
4101    /// then it must be used with the same
4102    /// [`RelationsOptions::include_relations`] value as the request that
4103    /// returns the `from` token, otherwise the server behavior is undefined.
4104    pub async fn relations(
4105        &self,
4106        event_id: OwnedEventId,
4107        opts: RelationsOptions,
4108    ) -> Result<Relations> {
4109        opts.send(self, event_id).await
4110    }
4111
4112    /// Search this room's [`RoomIndex`] for query and return at most
4113    /// max_number_of_results results.
4114    #[cfg(feature = "experimental-search")]
4115    pub async fn search(
4116        &self,
4117        query: &str,
4118        max_number_of_results: usize,
4119    ) -> Option<Vec<OwnedEventId>> {
4120        let mut search_index_guard = self.client.search_index().lock().await;
4121        search_index_guard.commit_and_reload(self.room_id());
4122        search_index_guard.search(query, max_number_of_results, self.room_id())
4123    }
4124
4125    /// Subscribe to a given thread in this room.
4126    ///
4127    /// This will subscribe the user to the thread, so that they will receive
4128    /// notifications for that thread specifically.
4129    ///
4130    /// # Arguments
4131    ///
4132    /// - `thread_root`: The ID of the thread root event to subscribe to.
4133    /// - `automatic`: Whether the subscription was made automatically by a
4134    ///   client, not by manual user choice. If set, must include the latest
4135    ///   event ID that's known in the thread and that is causing the automatic
4136    ///   subscription. If unset (i.e. we're now subscribing manually) and there
4137    ///   was a previous automatic subscription, the subscription will be
4138    ///   overridden to a manual one instead.
4139    ///
4140    /// # Returns
4141    ///
4142    /// - A 404 error if the event isn't known, or isn't a thread root.
4143    /// - An `Ok` result if the subscription was successful, or if the server
4144    ///   skipped an automatic subscription (as the user unsubscribed from the
4145    ///   thread after the event causing the automatic subscription).
4146    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4147    pub async fn subscribe_thread(
4148        &self,
4149        thread_root: OwnedEventId,
4150        automatic: Option<OwnedEventId>,
4151    ) -> Result<()> {
4152        let is_automatic = automatic.is_some();
4153
4154        match self
4155            .client
4156            .send(subscribe_thread::unstable::Request::new(
4157                self.room_id().to_owned(),
4158                thread_root.clone(),
4159                automatic,
4160            ))
4161            .await
4162        {
4163            Ok(_response) => {
4164                trace!("Server acknowledged the thread subscription; saving in db");
4165                // Immediately save the result into the database.
4166                self.client
4167                    .state_store()
4168                    .upsert_thread_subscription(
4169                        self.room_id(),
4170                        &thread_root,
4171                        ThreadSubscription { automatic: is_automatic },
4172                    )
4173                    .await?;
4174
4175                Ok(())
4176            }
4177
4178            Err(err) => {
4179                if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4180                    // In this case: the server indicates that the user unsubscribed *after* the
4181                    // event ID we've used in an automatic subscription; don't
4182                    // save the subscription state in the database, as the
4183                    // previous one should be more correct.
4184                    trace!("Thread subscription skipped: {err}");
4185                    Ok(())
4186                } else {
4187                    // Forward the error to the caller.
4188                    Err(err.into())
4189                }
4190            }
4191        }
4192    }
4193
4194    /// Subscribe to a thread if needed, based on a current subscription to it.
4195    ///
4196    /// This is like [`Self::subscribe_thread`], but it first checks if the user
4197    /// has already subscribed to a thread, so as to minimize sending
4198    /// unnecessary subscriptions which would be ignored by the server.
4199    pub async fn subscribe_thread_if_needed(
4200        &self,
4201        thread_root: &EventId,
4202        automatic: Option<OwnedEventId>,
4203    ) -> Result<()> {
4204        if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4205            // If we have a previous subscription, we should only send the new one if it's
4206            // manual and the previous one was automatic.
4207            if !prev_sub.automatic || automatic.is_some() {
4208                return Ok(());
4209            }
4210        }
4211        self.subscribe_thread(thread_root.to_owned(), automatic).await
4212    }
4213
4214    /// Unsubscribe from a given thread in this room.
4215    ///
4216    /// # Arguments
4217    ///
4218    /// - `thread_root`: The ID of the thread root event to unsubscribe to.
4219    ///
4220    /// # Returns
4221    ///
4222    /// - An `Ok` result if the unsubscription was successful, or the thread was
4223    ///   already unsubscribed.
4224    /// - A 404 error if the event isn't known, or isn't a thread root.
4225    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4226    pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4227        self.client
4228            .send(unsubscribe_thread::unstable::Request::new(
4229                self.room_id().to_owned(),
4230                thread_root.clone(),
4231            ))
4232            .await?;
4233
4234        trace!("Server acknowledged the thread subscription removal; removed it from db too");
4235
4236        // Immediately save the result into the database.
4237        self.client.state_store().remove_thread_subscription(self.room_id(), &thread_root).await?;
4238
4239        Ok(())
4240    }
4241
4242    /// Return the current thread subscription for the given thread root in this
4243    /// room.
4244    ///
4245    /// # Arguments
4246    ///
4247    /// - `thread_root`: The ID of the thread root event to get the subscription
4248    ///   for.
4249    ///
4250    /// # Returns
4251    ///
4252    /// - An `Ok` result with `Some(ThreadSubscription)` if we have some
4253    ///   subscription information.
4254    /// - An `Ok` result with `None` if the subscription does not exist, or the
4255    ///   event couldn't be found, or the event isn't a thread.
4256    /// - An error if the request fails for any other reason, such as a network
4257    ///   error.
4258    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4259    pub async fn fetch_thread_subscription(
4260        &self,
4261        thread_root: OwnedEventId,
4262    ) -> Result<Option<ThreadSubscription>> {
4263        let result = self
4264            .client
4265            .send(get_thread_subscription::unstable::Request::new(
4266                self.room_id().to_owned(),
4267                thread_root.clone(),
4268            ))
4269            .await;
4270
4271        let subscription = match result {
4272            Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4273            Err(http_error) => match http_error.as_client_api_error() {
4274                Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4275                _ => return Err(http_error.into()),
4276            },
4277        };
4278
4279        // Keep the database in sync.
4280        if let Some(sub) = &subscription {
4281            self.client
4282                .state_store()
4283                .upsert_thread_subscription(self.room_id(), &thread_root, *sub)
4284                .await?;
4285        } else {
4286            // If the subscription was not found, remove it from the database.
4287            self.client
4288                .state_store()
4289                .remove_thread_subscription(self.room_id(), &thread_root)
4290                .await?;
4291        }
4292
4293        Ok(subscription)
4294    }
4295
4296    /// Return the current thread subscription for the given thread root in this
4297    /// room, by getting it from storage if possible, or fetching it from
4298    /// network otherwise.
4299    ///
4300    /// See also [`Self::fetch_thread_subscription`] for the exact semantics of
4301    /// this method.
4302    pub async fn load_or_fetch_thread_subscription(
4303        &self,
4304        thread_root: &EventId,
4305    ) -> Result<Option<ThreadSubscription>> {
4306        // A bit of a lie at the moment, since thread subscriptions are not sync'd yet.
4307        self.fetch_thread_subscription(thread_root.to_owned()).await
4308    }
4309}
4310
4311#[cfg(feature = "e2e-encryption")]
4312impl RoomIdentityProvider for Room {
4313    fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4314        Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4315    }
4316
4317    fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4318        Box::pin(async {
4319            let members = self
4320                .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4321                .await
4322                .unwrap_or_else(|_| Default::default());
4323
4324            let mut ret: Vec<UserIdentity> = Vec::new();
4325            for member in members {
4326                if let Some(i) = self.user_identity(member.user_id()).await {
4327                    ret.push(i);
4328                }
4329            }
4330            ret
4331        })
4332    }
4333
4334    fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4335        Box::pin(async {
4336            self.client
4337                .encryption()
4338                .get_user_identity(user_id)
4339                .await
4340                .unwrap_or(None)
4341                .map(|u| u.underlying_identity())
4342        })
4343    }
4344}
4345
4346/// A wrapper for a weak client and a room id that allows to lazily retrieve a
4347/// room, only when needed.
4348#[derive(Clone, Debug)]
4349pub(crate) struct WeakRoom {
4350    client: WeakClient,
4351    room_id: OwnedRoomId,
4352}
4353
4354impl WeakRoom {
4355    /// Create a new `WeakRoom` given its weak components.
4356    pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4357        Self { client, room_id }
4358    }
4359
4360    /// Attempts to reconstruct the room.
4361    pub fn get(&self) -> Option<Room> {
4362        self.client.get().and_then(|client| client.get_room(&self.room_id))
4363    }
4364
4365    /// The room id for that room.
4366    pub fn room_id(&self) -> &RoomId {
4367        &self.room_id
4368    }
4369}
4370
4371/// Details of the (latest) invite.
4372#[derive(Debug, Clone)]
4373pub struct Invite {
4374    /// Who has been invited.
4375    pub invitee: RoomMember,
4376    /// Who sent the invite.
4377    pub inviter: Option<RoomMember>,
4378}
4379
4380#[derive(Error, Debug)]
4381enum InvitationError {
4382    #[error("No membership event found")]
4383    EventMissing,
4384}
4385
4386/// Receipts to send all at once.
4387#[derive(Debug, Clone, Default)]
4388#[non_exhaustive]
4389pub struct Receipts {
4390    /// Fully-read marker (room account data).
4391    pub fully_read: Option<OwnedEventId>,
4392    /// Read receipt (public ephemeral room event).
4393    pub public_read_receipt: Option<OwnedEventId>,
4394    /// Read receipt (private ephemeral room event).
4395    pub private_read_receipt: Option<OwnedEventId>,
4396}
4397
4398impl Receipts {
4399    /// Create an empty `Receipts`.
4400    pub fn new() -> Self {
4401        Self::default()
4402    }
4403
4404    /// Set the last event the user has read.
4405    ///
4406    /// It means that the user has read all the events before this event.
4407    ///
4408    /// This is a private marker only visible by the user.
4409    ///
4410    /// Note that this is technically not a receipt as it is persisted in the
4411    /// room account data.
4412    pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4413        self.fully_read = event_id.into();
4414        self
4415    }
4416
4417    /// Set the last event presented to the user and forward it to the other
4418    /// users in the room.
4419    ///
4420    /// This is used to reset the unread messages/notification count and
4421    /// advertise to other users the last event that the user has likely seen.
4422    pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4423        self.public_read_receipt = event_id.into();
4424        self
4425    }
4426
4427    /// Set the last event presented to the user and don't forward it.
4428    ///
4429    /// This is used to reset the unread messages/notification count.
4430    pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4431        self.private_read_receipt = event_id.into();
4432        self
4433    }
4434
4435    /// Whether this `Receipts` is empty.
4436    pub fn is_empty(&self) -> bool {
4437        self.fully_read.is_none()
4438            && self.public_read_receipt.is_none()
4439            && self.private_read_receipt.is_none()
4440    }
4441}
4442
4443/// [Parent space](https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships)
4444/// listed by a room, possibly validated by checking the space's state.
4445#[derive(Debug)]
4446pub enum ParentSpace {
4447    /// The room recognizes the given room as its parent, and the parent
4448    /// recognizes it as its child.
4449    Reciprocal(Room),
4450    /// The room recognizes the given room as its parent, but the parent does
4451    /// not recognizes it as its child. However, the author of the
4452    /// `m.space.parent` event in the room has a sufficient power level in the
4453    /// parent to create the child event.
4454    WithPowerlevel(Room),
4455    /// The room recognizes the given room as its parent, but the parent does
4456    /// not recognizes it as its child.
4457    Illegitimate(Room),
4458    /// The room recognizes the given id as its parent room, but we cannot check
4459    /// whether the parent recognizes it as its child.
4460    Unverifiable(OwnedRoomId),
4461}
4462
4463/// The score to rate an inappropriate content.
4464///
4465/// Must be a value between `0`, inoffensive, and `-100`, very offensive.
4466#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4467pub struct ReportedContentScore(i8);
4468
4469impl ReportedContentScore {
4470    /// The smallest value that can be represented by this type.
4471    ///
4472    /// This is for very offensive content.
4473    pub const MIN: Self = Self(-100);
4474
4475    /// The largest value that can be represented by this type.
4476    ///
4477    /// This is for inoffensive content.
4478    pub const MAX: Self = Self(0);
4479
4480    /// Try to create a `ReportedContentScore` from the provided `i8`.
4481    ///
4482    /// Returns `None` if it is smaller than [`ReportedContentScore::MIN`] or
4483    /// larger than [`ReportedContentScore::MAX`] .
4484    ///
4485    /// This is the same as the `TryFrom<i8>` implementation for
4486    /// `ReportedContentScore`, except that it returns an `Option` instead
4487    /// of a `Result`.
4488    pub fn new(value: i8) -> Option<Self> {
4489        value.try_into().ok()
4490    }
4491
4492    /// Create a `ReportedContentScore` from the provided `i8` clamped to the
4493    /// acceptable interval.
4494    ///
4495    /// The given value gets clamped into the closed interval between
4496    /// [`ReportedContentScore::MIN`] and [`ReportedContentScore::MAX`].
4497    pub fn new_saturating(value: i8) -> Self {
4498        if value > Self::MAX {
4499            Self::MAX
4500        } else if value < Self::MIN {
4501            Self::MIN
4502        } else {
4503            Self(value)
4504        }
4505    }
4506
4507    /// The value of this score.
4508    pub fn value(&self) -> i8 {
4509        self.0
4510    }
4511}
4512
4513impl PartialEq<i8> for ReportedContentScore {
4514    fn eq(&self, other: &i8) -> bool {
4515        self.0.eq(other)
4516    }
4517}
4518
4519impl PartialEq<ReportedContentScore> for i8 {
4520    fn eq(&self, other: &ReportedContentScore) -> bool {
4521        self.eq(&other.0)
4522    }
4523}
4524
4525impl PartialOrd<i8> for ReportedContentScore {
4526    fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4527        self.0.partial_cmp(other)
4528    }
4529}
4530
4531impl PartialOrd<ReportedContentScore> for i8 {
4532    fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4533        self.partial_cmp(&other.0)
4534    }
4535}
4536
4537impl From<ReportedContentScore> for Int {
4538    fn from(value: ReportedContentScore) -> Self {
4539        value.0.into()
4540    }
4541}
4542
4543impl TryFrom<i8> for ReportedContentScore {
4544    type Error = TryFromReportedContentScoreError;
4545
4546    fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4547        if value > Self::MAX || value < Self::MIN {
4548            Err(TryFromReportedContentScoreError(()))
4549        } else {
4550            Ok(Self(value))
4551        }
4552    }
4553}
4554
4555impl TryFrom<i16> for ReportedContentScore {
4556    type Error = TryFromReportedContentScoreError;
4557
4558    fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4559        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4560        value.try_into()
4561    }
4562}
4563
4564impl TryFrom<i32> for ReportedContentScore {
4565    type Error = TryFromReportedContentScoreError;
4566
4567    fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4568        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4569        value.try_into()
4570    }
4571}
4572
4573impl TryFrom<i64> for ReportedContentScore {
4574    type Error = TryFromReportedContentScoreError;
4575
4576    fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4577        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4578        value.try_into()
4579    }
4580}
4581
4582impl TryFrom<Int> for ReportedContentScore {
4583    type Error = TryFromReportedContentScoreError;
4584
4585    fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4586        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4587        value.try_into()
4588    }
4589}
4590
4591trait EventSource {
4592    fn get_event(
4593        &self,
4594        event_id: &EventId,
4595    ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4596}
4597
4598impl EventSource for &Room {
4599    async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4600        self.load_or_fetch_event(event_id, None).await
4601    }
4602}
4603
4604/// The error type returned when a checked `ReportedContentScore` conversion
4605/// fails.
4606#[derive(Debug, Clone, Error)]
4607#[error("out of range conversion attempted")]
4608pub struct TryFromReportedContentScoreError(());
4609
4610/// Contains the current user's room member info and the optional room member
4611/// info of the sender of the `m.room.member` event that this info represents.
4612#[derive(Debug)]
4613pub struct RoomMemberWithSenderInfo {
4614    /// The actual room member.
4615    pub room_member: RoomMember,
4616    /// The info of the sender of the event `room_member` is based on, if
4617    /// available.
4618    pub sender_info: Option<RoomMember>,
4619}
4620
4621#[cfg(all(test, not(target_family = "wasm")))]
4622mod tests {
4623    use std::collections::BTreeMap;
4624
4625    use matrix_sdk_base::{store::ComposerDraftType, ComposerDraft};
4626    use matrix_sdk_test::{
4627        async_test, event_factory::EventFactory, test_json, JoinedRoomBuilder, StateTestEvent,
4628        SyncResponseBuilder,
4629    };
4630    use ruma::{
4631        event_id,
4632        events::{relation::RelationType, room::member::MembershipState},
4633        int, owned_event_id, room_id, user_id, RoomVersionId,
4634    };
4635    use wiremock::{
4636        matchers::{header, method, path_regex},
4637        Mock, MockServer, ResponseTemplate,
4638    };
4639
4640    use super::ReportedContentScore;
4641    use crate::{
4642        config::RequestConfig,
4643        room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4644        test_utils::{
4645            client::mock_matrix_session,
4646            logged_in_client,
4647            mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4648        },
4649        Client,
4650    };
4651
4652    #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4653    #[async_test]
4654    async fn test_cache_invalidation_while_encrypt() {
4655        use matrix_sdk_base::store::RoomLoadSettings;
4656        use matrix_sdk_test::{message_like_event_content, DEFAULT_TEST_ROOM_ID};
4657
4658        let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4659        let session = mock_matrix_session();
4660
4661        let client = Client::builder()
4662            .homeserver_url("http://localhost:1234")
4663            .request_config(RequestConfig::new().disable_retry())
4664            .sqlite_store(&sqlite_path, None)
4665            .build()
4666            .await
4667            .unwrap();
4668        client
4669            .matrix_auth()
4670            .restore_session(session.clone(), RoomLoadSettings::default())
4671            .await
4672            .unwrap();
4673
4674        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4675
4676        // Mock receiving an event to create an internal room.
4677        let server = MockServer::start().await;
4678        {
4679            Mock::given(method("GET"))
4680                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4681                .and(header("authorization", "Bearer 1234"))
4682                .respond_with(
4683                    ResponseTemplate::new(200)
4684                        .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4685                )
4686                .mount(&server)
4687                .await;
4688            let response = SyncResponseBuilder::default()
4689                .add_joined_room(
4690                    JoinedRoomBuilder::default()
4691                        .add_state_event(StateTestEvent::Member)
4692                        .add_state_event(StateTestEvent::PowerLevels)
4693                        .add_state_event(StateTestEvent::Encryption),
4694                )
4695                .build_sync_response();
4696            client.base_client().receive_sync_response(response).await.unwrap();
4697        }
4698
4699        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4700
4701        // Step 1, preshare the room keys.
4702        room.preshare_room_key().await.unwrap();
4703
4704        // Step 2, force lock invalidation by pretending another client obtained the
4705        // lock.
4706        {
4707            let client = Client::builder()
4708                .homeserver_url("http://localhost:1234")
4709                .request_config(RequestConfig::new().disable_retry())
4710                .sqlite_store(&sqlite_path, None)
4711                .build()
4712                .await
4713                .unwrap();
4714            client
4715                .matrix_auth()
4716                .restore_session(session.clone(), RoomLoadSettings::default())
4717                .await
4718                .unwrap();
4719            client
4720                .encryption()
4721                .enable_cross_process_store_lock("client2".to_owned())
4722                .await
4723                .unwrap();
4724
4725            let guard = client.encryption().spin_lock_store(None).await.unwrap();
4726            assert!(guard.is_some());
4727        }
4728
4729        // Step 3, take the crypto-store lock.
4730        let guard = client.encryption().spin_lock_store(None).await.unwrap();
4731        assert!(guard.is_some());
4732
4733        // Step 4, try to encrypt a message.
4734        let olm = client.olm_machine().await;
4735        let olm = olm.as_ref().expect("Olm machine wasn't started");
4736
4737        // Now pretend we're encrypting an event; the olm machine shouldn't rely on
4738        // caching the outgoing session before.
4739        let _encrypted_content = olm
4740            .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4741            .await
4742            .unwrap();
4743    }
4744
4745    #[test]
4746    fn reported_content_score() {
4747        // i8
4748        let score = ReportedContentScore::new(0).unwrap();
4749        assert_eq!(score.value(), 0);
4750        let score = ReportedContentScore::new(-50).unwrap();
4751        assert_eq!(score.value(), -50);
4752        let score = ReportedContentScore::new(-100).unwrap();
4753        assert_eq!(score.value(), -100);
4754        assert_eq!(ReportedContentScore::new(10), None);
4755        assert_eq!(ReportedContentScore::new(-110), None);
4756
4757        let score = ReportedContentScore::new_saturating(0);
4758        assert_eq!(score.value(), 0);
4759        let score = ReportedContentScore::new_saturating(-50);
4760        assert_eq!(score.value(), -50);
4761        let score = ReportedContentScore::new_saturating(-100);
4762        assert_eq!(score.value(), -100);
4763        let score = ReportedContentScore::new_saturating(10);
4764        assert_eq!(score, ReportedContentScore::MAX);
4765        let score = ReportedContentScore::new_saturating(-110);
4766        assert_eq!(score, ReportedContentScore::MIN);
4767
4768        // i16
4769        let score = ReportedContentScore::try_from(0i16).unwrap();
4770        assert_eq!(score.value(), 0);
4771        let score = ReportedContentScore::try_from(-100i16).unwrap();
4772        assert_eq!(score.value(), -100);
4773        ReportedContentScore::try_from(10i16).unwrap_err();
4774        ReportedContentScore::try_from(-110i16).unwrap_err();
4775
4776        // i32
4777        let score = ReportedContentScore::try_from(0i32).unwrap();
4778        assert_eq!(score.value(), 0);
4779        let score = ReportedContentScore::try_from(-100i32).unwrap();
4780        assert_eq!(score.value(), -100);
4781        ReportedContentScore::try_from(10i32).unwrap_err();
4782        ReportedContentScore::try_from(-110i32).unwrap_err();
4783
4784        // i64
4785        let score = ReportedContentScore::try_from(0i64).unwrap();
4786        assert_eq!(score.value(), 0);
4787        let score = ReportedContentScore::try_from(-100i64).unwrap();
4788        assert_eq!(score.value(), -100);
4789        ReportedContentScore::try_from(10i64).unwrap_err();
4790        ReportedContentScore::try_from(-110i64).unwrap_err();
4791
4792        // Int
4793        let score = ReportedContentScore::try_from(int!(0)).unwrap();
4794        assert_eq!(score.value(), 0);
4795        let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4796        assert_eq!(score.value(), -100);
4797        ReportedContentScore::try_from(int!(10)).unwrap_err();
4798        ReportedContentScore::try_from(int!(-110)).unwrap_err();
4799    }
4800
4801    #[async_test]
4802    async fn test_composer_draft() {
4803        use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4804
4805        let client = logged_in_client(None).await;
4806
4807        let response = SyncResponseBuilder::default()
4808            .add_joined_room(JoinedRoomBuilder::default())
4809            .build_sync_response();
4810        client.base_client().receive_sync_response(response).await.unwrap();
4811        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4812
4813        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4814
4815        // Save 2 drafts, one for the room and one for a thread.
4816
4817        let draft = ComposerDraft {
4818            plain_text: "Hello, world!".to_owned(),
4819            html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4820            draft_type: ComposerDraftType::NewMessage,
4821        };
4822
4823        room.save_composer_draft(draft.clone(), None).await.unwrap();
4824
4825        let thread_root = owned_event_id!("$thread_root:b.c");
4826        let thread_draft = ComposerDraft {
4827            plain_text: "Hello, thread!".to_owned(),
4828            html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4829            draft_type: ComposerDraftType::NewMessage,
4830        };
4831
4832        room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4833
4834        // Check that the room draft was saved correctly
4835        assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4836
4837        // Check that the thread draft was saved correctly
4838        assert_eq!(
4839            room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4840            Some(thread_draft.clone())
4841        );
4842
4843        // Clear the room draft
4844        room.clear_composer_draft(None).await.unwrap();
4845        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4846
4847        // Check that the thread one is still there
4848        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4849
4850        // Clear the thread draft as well
4851        room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4852        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4853    }
4854
4855    #[async_test]
4856    async fn test_mark_join_requests_as_seen() {
4857        let server = MatrixMockServer::new().await;
4858        let client = server.client_builder().build().await;
4859        let event_id = event_id!("$a:b.c");
4860        let room_id = room_id!("!a:b.c");
4861        let user_id = user_id!("@alice:b.c");
4862
4863        let f = EventFactory::new().room(room_id);
4864        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f
4865            .member(user_id)
4866            .membership(MembershipState::Knock)
4867            .event_id(event_id)
4868            .into_raw()]);
4869        let room = server.sync_room(&client, joined_room_builder).await;
4870
4871        // When loading the initial seen ids, there are none
4872        let seen_ids =
4873            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4874        assert!(seen_ids.is_empty());
4875
4876        // We mark a random event id as seen
4877        room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4878            .await
4879            .expect("Couldn't mark join request as seen");
4880
4881        // Then we can check it was successfully marked as seen
4882        let seen_ids =
4883            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4884        assert_eq!(seen_ids.len(), 1);
4885        assert_eq!(
4886            seen_ids.into_iter().next().expect("No next value"),
4887            (event_id.to_owned(), user_id.to_owned())
4888        )
4889    }
4890
4891    #[async_test]
4892    async fn test_own_room_membership_with_no_own_member_event() {
4893        let server = MatrixMockServer::new().await;
4894        let client = server.client_builder().build().await;
4895        let room_id = room_id!("!a:b.c");
4896
4897        let room = server.sync_joined_room(&client, room_id).await;
4898
4899        // Since there is no member event for the own user, the method fails.
4900        // This should never happen in an actual room.
4901        let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4902        assert!(error.is_some());
4903    }
4904
4905    #[async_test]
4906    async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4907        let server = MatrixMockServer::new().await;
4908        let client = server.client_builder().build().await;
4909        let room_id = room_id!("!a:b.c");
4910        let user_id = user_id!("@example:localhost");
4911
4912        let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
4913        let joined_room_builder =
4914            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4915        let room = server.sync_room(&client, joined_room_builder).await;
4916
4917        // When we load the membership details
4918        let ret = room
4919            .member_with_sender_info(client.user_id().unwrap())
4920            .await
4921            .expect("Room member info should be available");
4922
4923        // We get the member info for the current user
4924        assert_eq!(ret.room_member.event().user_id(), user_id);
4925
4926        // But there is no info for the sender
4927        assert!(ret.sender_info.is_none());
4928    }
4929
4930    #[async_test]
4931    async fn test_own_room_membership_with_own_member_event_and_own_sender() {
4932        let server = MatrixMockServer::new().await;
4933        let client = server.client_builder().build().await;
4934        let room_id = room_id!("!a:b.c");
4935        let user_id = user_id!("@example:localhost");
4936
4937        let f = EventFactory::new().room(room_id).sender(user_id);
4938        let joined_room_builder =
4939            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4940        let room = server.sync_room(&client, joined_room_builder).await;
4941
4942        // When we load the membership details
4943        let ret = room
4944            .member_with_sender_info(client.user_id().unwrap())
4945            .await
4946            .expect("Room member info should be available");
4947
4948        // We get the current user's member info
4949        assert_eq!(ret.room_member.event().user_id(), user_id);
4950
4951        // And the sender has the same info, since it's also the current user
4952        assert!(ret.sender_info.is_some());
4953        assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
4954    }
4955
4956    #[async_test]
4957    async fn test_own_room_membership_with_own_member_event_and_known_sender() {
4958        let server = MatrixMockServer::new().await;
4959        let client = server.client_builder().build().await;
4960        let room_id = room_id!("!a:b.c");
4961        let user_id = user_id!("@example:localhost");
4962        let sender_id = user_id!("@alice:b.c");
4963
4964        let f = EventFactory::new().room(room_id).sender(sender_id);
4965        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4966            f.member(user_id).into_raw(),
4967            // The sender info comes from the sync
4968            f.member(sender_id).into_raw(),
4969        ]);
4970        let room = server.sync_room(&client, joined_room_builder).await;
4971
4972        // When we load the membership details
4973        let ret = room
4974            .member_with_sender_info(client.user_id().unwrap())
4975            .await
4976            .expect("Room member info should be available");
4977
4978        // We get the current user's member info
4979        assert_eq!(ret.room_member.event().user_id(), user_id);
4980
4981        // And also the sender info from the events received in the sync
4982        assert!(ret.sender_info.is_some());
4983        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
4984    }
4985
4986    #[async_test]
4987    async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
4988        let server = MatrixMockServer::new().await;
4989        let client = server.client_builder().build().await;
4990        let room_id = room_id!("!a:b.c");
4991        let user_id = user_id!("@example:localhost");
4992        let sender_id = user_id!("@alice:b.c");
4993
4994        let f = EventFactory::new().room(room_id).sender(sender_id);
4995        let joined_room_builder =
4996            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4997        let room = server.sync_room(&client, joined_room_builder).await;
4998
4999        // We'll receive the member info through the /members endpoint
5000        server
5001            .mock_get_members()
5002            .ok(vec![f.member(sender_id).into_raw()])
5003            .mock_once()
5004            .mount()
5005            .await;
5006
5007        // We get the current user's member info
5008        let ret = room
5009            .member_with_sender_info(client.user_id().unwrap())
5010            .await
5011            .expect("Room member info should be available");
5012
5013        // We get the current user's member info
5014        assert_eq!(ret.room_member.event().user_id(), user_id);
5015
5016        // And also the sender info from the /members endpoint
5017        assert!(ret.sender_info.is_some());
5018        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5019    }
5020
5021    #[async_test]
5022    async fn test_list_threads() {
5023        let server = MatrixMockServer::new().await;
5024        let client = server.client_builder().build().await;
5025
5026        let room_id = room_id!("!a:b.c");
5027        let sender_id = user_id!("@alice:b.c");
5028        let f = EventFactory::new().room(room_id).sender(sender_id);
5029
5030        let eid1 = event_id!("$1");
5031        let eid2 = event_id!("$2");
5032        let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5033        let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5034
5035        server
5036            .mock_room_threads()
5037            .ok(batch1.clone(), Some("prev_batch".to_owned()))
5038            .mock_once()
5039            .mount()
5040            .await;
5041        server
5042            .mock_room_threads()
5043            .match_from("prev_batch")
5044            .ok(batch2, None)
5045            .mock_once()
5046            .mount()
5047            .await;
5048
5049        let room = server.sync_joined_room(&client, room_id).await;
5050        let result =
5051            room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5052        assert_eq!(result.chunk.len(), 1);
5053        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5054        assert!(result.prev_batch_token.is_some());
5055
5056        let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5057        let result = room.list_threads(opts).await.expect("Failed to list threads");
5058        assert_eq!(result.chunk.len(), 1);
5059        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5060        assert!(result.prev_batch_token.is_none());
5061    }
5062
5063    #[async_test]
5064    async fn test_relations() {
5065        let server = MatrixMockServer::new().await;
5066        let client = server.client_builder().build().await;
5067
5068        let room_id = room_id!("!a:b.c");
5069        let sender_id = user_id!("@alice:b.c");
5070        let f = EventFactory::new().room(room_id).sender(sender_id);
5071
5072        let target_event_id = owned_event_id!("$target");
5073        let eid1 = event_id!("$1");
5074        let eid2 = event_id!("$2");
5075        let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5076        let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5077
5078        server
5079            .mock_room_relations()
5080            .match_target_event(target_event_id.clone())
5081            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5082            .mock_once()
5083            .mount()
5084            .await;
5085
5086        server
5087            .mock_room_relations()
5088            .match_target_event(target_event_id.clone())
5089            .match_from("next_batch")
5090            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5091            .mock_once()
5092            .mount()
5093            .await;
5094
5095        let room = server.sync_joined_room(&client, room_id).await;
5096
5097        // Main endpoint: no relation type filtered out.
5098        let mut opts = RelationsOptions {
5099            include_relations: IncludeRelations::AllRelations,
5100            ..Default::default()
5101        };
5102        let result = room
5103            .relations(target_event_id.clone(), opts.clone())
5104            .await
5105            .expect("Failed to list relations the first time");
5106        assert_eq!(result.chunk.len(), 1);
5107        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5108        assert!(result.prev_batch_token.is_none());
5109        assert!(result.next_batch_token.is_some());
5110        assert!(result.recursion_depth.is_none());
5111
5112        opts.from = result.next_batch_token;
5113        let result = room
5114            .relations(target_event_id, opts)
5115            .await
5116            .expect("Failed to list relations the second time");
5117        assert_eq!(result.chunk.len(), 1);
5118        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5119        assert!(result.prev_batch_token.is_none());
5120        assert!(result.next_batch_token.is_none());
5121        assert!(result.recursion_depth.is_none());
5122    }
5123
5124    #[async_test]
5125    async fn test_relations_with_reltype() {
5126        let server = MatrixMockServer::new().await;
5127        let client = server.client_builder().build().await;
5128
5129        let room_id = room_id!("!a:b.c");
5130        let sender_id = user_id!("@alice:b.c");
5131        let f = EventFactory::new().room(room_id).sender(sender_id);
5132
5133        let target_event_id = owned_event_id!("$target");
5134        let eid1 = event_id!("$1");
5135        let eid2 = event_id!("$2");
5136        let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5137        let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5138
5139        server
5140            .mock_room_relations()
5141            .match_target_event(target_event_id.clone())
5142            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5143            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5144            .mock_once()
5145            .mount()
5146            .await;
5147
5148        server
5149            .mock_room_relations()
5150            .match_target_event(target_event_id.clone())
5151            .match_from("next_batch")
5152            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5153            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5154            .mock_once()
5155            .mount()
5156            .await;
5157
5158        let room = server.sync_joined_room(&client, room_id).await;
5159
5160        // Reltype-filtered endpoint, for threads \o/
5161        let mut opts = RelationsOptions {
5162            include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5163            ..Default::default()
5164        };
5165        let result = room
5166            .relations(target_event_id.clone(), opts.clone())
5167            .await
5168            .expect("Failed to list relations the first time");
5169        assert_eq!(result.chunk.len(), 1);
5170        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5171        assert!(result.prev_batch_token.is_none());
5172        assert!(result.next_batch_token.is_some());
5173        assert!(result.recursion_depth.is_none());
5174
5175        opts.from = result.next_batch_token;
5176        let result = room
5177            .relations(target_event_id, opts)
5178            .await
5179            .expect("Failed to list relations the second time");
5180        assert_eq!(result.chunk.len(), 1);
5181        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5182        assert!(result.prev_batch_token.is_none());
5183        assert!(result.next_batch_token.is_none());
5184        assert!(result.recursion_depth.is_none());
5185    }
5186
5187    #[async_test]
5188    async fn test_power_levels_computation() {
5189        let server = MatrixMockServer::new().await;
5190        let client = server.client_builder().build().await;
5191
5192        let room_id = room_id!("!a:b.c");
5193        let sender_id = client.user_id().expect("No session id");
5194        let f = EventFactory::new().room(room_id).sender(sender_id);
5195        let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5196
5197        // Computing the power levels will need these 3 state events:
5198        let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into_raw();
5199        let power_levels_event = f.power_levels(&mut user_map).state_key("").into_raw();
5200        let room_member_event = f.member(sender_id).into_raw();
5201
5202        // With only the room member event
5203        let room = server
5204            .sync_room(
5205                &client,
5206                JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event.clone()]),
5207            )
5208            .await;
5209        let ctx = room
5210            .push_condition_room_ctx()
5211            .await
5212            .expect("Failed to get push condition context")
5213            .expect("Could not get push condition context");
5214
5215        // The internal power levels couldn't be computed
5216        assert!(ctx.power_levels.is_none());
5217
5218        // Adding the room creation event
5219        let room = server
5220            .sync_room(
5221                &client,
5222                JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event.clone()]),
5223            )
5224            .await;
5225        let ctx = room
5226            .push_condition_room_ctx()
5227            .await
5228            .expect("Failed to get push condition context")
5229            .expect("Could not get push condition context");
5230
5231        // The internal power levels still couldn't be computed
5232        assert!(ctx.power_levels.is_none());
5233
5234        // With the room member, room creation and the power levels events
5235        let room = server
5236            .sync_room(
5237                &client,
5238                JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5239            )
5240            .await;
5241        let ctx = room
5242            .push_condition_room_ctx()
5243            .await
5244            .expect("Failed to get push condition context")
5245            .expect("Could not get push condition context");
5246
5247        // The internal power levels can finally be computed
5248        assert!(ctx.power_levels.is_some());
5249    }
5250}