Skip to main content

matrix_sdk/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level room API
16
17use std::{
18    borrow::Borrow,
19    collections::{BTreeMap, HashMap},
20    future::Future,
21    ops::Deref,
22    sync::Arc,
23    time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30    StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39    IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43    ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
44    StateChanges, StateStoreDataKey, StateStoreDataValue,
45    deserialized_responses::{
46        RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47    },
48    media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49    serde_helpers::extract_relation,
50    store::{StateStoreExt, ThreadSubscriptionStatus},
51};
52#[cfg(feature = "e2e-encryption")]
53use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
54#[cfg(feature = "e2e-encryption")]
55use matrix_sdk_common::BoxFuture;
56use matrix_sdk_common::{
57    deserialized_responses::TimelineEvent,
58    executor::{JoinHandle, spawn},
59    timeout::timeout,
60};
61#[cfg(feature = "experimental-search")]
62use matrix_sdk_search::error::IndexError;
63#[cfg(feature = "experimental-search")]
64#[cfg(doc)]
65use matrix_sdk_search::index::RoomIndex;
66use mime::Mime;
67use reply::Reply;
68#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
69use ruma::events::AnySyncMessageLikeEvent;
70#[cfg(feature = "experimental-encrypted-state-events")]
71use ruma::events::AnySyncStateEvent;
72#[cfg(feature = "unstable-msc4274")]
73use ruma::events::room::message::GalleryItemType;
74#[cfg(feature = "e2e-encryption")]
75use ruma::events::{
76    AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
77};
78use ruma::{
79    EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
80    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
81    api::client::{
82        config::{set_global_account_data, set_room_account_data},
83        context,
84        error::ErrorKind,
85        filter::LazyLoadOptions,
86        membership::{
87            Invite3pid, ban_user, forget_room, get_member_events,
88            invite_user::{self, v3::InvitationRecipient},
89            kick_user, leave_room, unban_user,
90        },
91        message::send_message_event,
92        read_marker::set_read_marker,
93        receipt::create_receipt,
94        redact::redact_event,
95        room::{get_room_event, report_content, report_room},
96        state::{get_state_event_for_key, send_state_event},
97        tag::{create_tag, delete_tag},
98        threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
99        typing::create_typing_event::{self, v3::Typing},
100    },
101    assign,
102    events::{
103        AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
104        Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
105        RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
106        RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
107        StaticStateEventContent, SyncStateEvent,
108        beacon::BeaconEventContent,
109        beacon_info::BeaconInfoEventContent,
110        direct::DirectEventContent,
111        marked_unread::MarkedUnreadEventContent,
112        receipt::{Receipt, ReceiptThread, ReceiptType},
113        relation::RelationType,
114        room::{
115            ImageInfo, MediaSource, ThumbnailInfo,
116            avatar::{self, RoomAvatarEventContent},
117            encryption::RoomEncryptionEventContent,
118            history_visibility::HistoryVisibility,
119            member::{MembershipChange, RoomMemberEventContent, SyncRoomMemberEvent},
120            message::{
121                AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
122                ImageMessageEventContent, MessageType, RoomMessageEventContent,
123                TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
124                UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
125            },
126            name::RoomNameEventContent,
127            pinned_events::RoomPinnedEventsEventContent,
128            power_levels::{
129                RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
130            },
131            server_acl::RoomServerAclEventContent,
132            topic::RoomTopicEventContent,
133        },
134        space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
135        tag::{TagInfo, TagName},
136        typing::SyncTypingEvent,
137    },
138    int,
139    push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
140    serde::Raw,
141    time::Instant,
142    uint,
143};
144#[cfg(feature = "experimental-encrypted-state-events")]
145use ruma::{
146    events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
147    serde::JsonCastable,
148};
149use serde::de::DeserializeOwned;
150use thiserror::Error;
151use tokio::{join, sync::broadcast};
152use tracing::{debug, error, info, instrument, trace, warn};
153
154use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
155pub use self::{
156    member::{RoomMember, RoomMemberRole},
157    messages::{
158        EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
159        Relations, RelationsOptions, ThreadRoots,
160    },
161};
162#[cfg(feature = "e2e-encryption")]
163use crate::encryption::backups::BackupState;
164#[cfg(doc)]
165use crate::event_cache::EventCache;
166#[cfg(feature = "experimental-encrypted-state-events")]
167use crate::room::futures::{SendRawStateEvent, SendStateEvent};
168use crate::{
169    BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
170    attachment::{AttachmentConfig, AttachmentInfo},
171    client::WeakClient,
172    config::RequestConfig,
173    error::{BeaconError, WrongRoomState},
174    event_cache::{self, EventCacheDropHandles, RoomEventCache},
175    event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
176    live_location_share::ObservableLiveLocation,
177    media::{MediaFormat, MediaRequestParameters},
178    notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
179    room::{
180        knock_requests::{KnockRequest, KnockRequestMemberInfo},
181        power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
182        privacy_settings::RoomPrivacySettings,
183    },
184    sync::RoomUpdate,
185    utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
186};
187
188pub mod edit;
189pub mod futures;
190pub mod identity_status_changes;
191/// Contains code related to requests to join a room.
192pub mod knock_requests;
193mod member;
194mod messages;
195pub mod power_levels;
196pub mod reply;
197
198pub mod calls;
199
200/// Contains all the functionality for modifying the privacy settings in a room.
201pub mod privacy_settings;
202
203#[cfg(feature = "e2e-encryption")]
204pub(crate) mod shared_room_history;
205
206/// A struct containing methods that are common for Joined, Invited and Left
207/// Rooms
208#[derive(Debug, Clone)]
209pub struct Room {
210    inner: BaseRoom,
211    pub(crate) client: Client,
212}
213
214impl Deref for Room {
215    type Target = BaseRoom;
216
217    fn deref(&self) -> &Self::Target {
218        &self.inner
219    }
220}
221
222const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
223const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
224
225/// A thread subscription, according to the semantics of MSC4306.
226#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227pub struct ThreadSubscription {
228    /// Whether the subscription was made automatically by a client, not by
229    /// manual user choice.
230    pub automatic: bool,
231}
232
233/// Context allowing to compute the push actions for a given event.
234#[derive(Debug)]
235pub struct PushContext {
236    /// The Ruma context used to compute the push actions.
237    push_condition_room_ctx: PushConditionRoomCtx,
238
239    /// Push rules for this room, based on the push rules state event, or the
240    /// global server default as defined by [`Ruleset::server_default`].
241    push_rules: Ruleset,
242}
243
244impl PushContext {
245    /// Create a new [`PushContext`] from its inner components.
246    pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
247        Self { push_condition_room_ctx, push_rules }
248    }
249
250    /// Compute the push rules for a given event.
251    pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
252        self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
253    }
254
255    /// Compute the push rules for a given event, with extra logging to help
256    /// debugging.
257    #[doc(hidden)]
258    #[instrument(skip_all)]
259    pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
260        let rules = self
261            .push_rules
262            .iter()
263            .filter_map(|r| {
264                if !r.enabled() {
265                    return None;
266                }
267
268                let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
269
270                let conditions = match r {
271                    AnyPushRuleRef::Override(r) => {
272                        format!("{:?}", r.conditions)
273                    }
274                    AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
275                    AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
276                    AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
277                    AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
278                    _ => "<unknown push rule kind>".to_owned(),
279                };
280
281                Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
282            })
283            .collect::<Vec<_>>()
284            .join("\n");
285        trace!("rules:\n\n{rules}\n\n");
286
287        let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
288
289        if let Some(found) = found {
290            trace!("rule {} matched", found.rule_id());
291            found.actions().to_owned()
292        } else {
293            trace!("no match");
294            Vec::new()
295        }
296    }
297}
298
299macro_rules! make_media_type {
300    ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
301        // If caption is set, use it as body, and filename as the file name; otherwise,
302        // body is the filename, and the filename is not set.
303        // https://github.com/matrix-org/matrix-spec-proposals/blob/main/proposals/2530-body-as-caption.md
304        let (body, formatted, filename) = match $caption {
305            Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
306            None => ($filename, None, None),
307        };
308
309        let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
310
311        match $content_type.type_() {
312            mime::IMAGE => {
313                let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
314                    mimetype: Some($content_type.as_ref().to_owned()),
315                    thumbnail_source,
316                    thumbnail_info
317                });
318                let content = assign!(ImageMessageEventContent::new(body, $source), {
319                    info: Some(Box::new(info)),
320                    formatted,
321                    filename
322                });
323                <$t>::Image(content)
324            }
325
326            mime::AUDIO => {
327                let mut content = assign!(AudioMessageEventContent::new(body, $source), {
328                    formatted,
329                    filename
330                });
331
332                if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
333                 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
334                    let waveform = waveform_vec
335                        .iter()
336                        .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
337                        .map(Into::into)
338                        .collect();
339                    content.audio =
340                        Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
341                }
342
343                if matches!($info, Some(AttachmentInfo::Voice(_))) {
344                    content.voice = Some(UnstableVoiceContentBlock::new());
345                }
346
347                let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
348                audio_info.mimetype = Some($content_type.as_ref().to_owned());
349                let content = content.info(Box::new(audio_info));
350
351                <$t>::Audio(content)
352            }
353
354            mime::VIDEO => {
355                let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
356                    mimetype: Some($content_type.as_ref().to_owned()),
357                    thumbnail_source,
358                    thumbnail_info
359                });
360                let content = assign!(VideoMessageEventContent::new(body, $source), {
361                    info: Some(Box::new(info)),
362                    formatted,
363                    filename
364                });
365                <$t>::Video(content)
366            }
367
368            _ => {
369                let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
370                    mimetype: Some($content_type.as_ref().to_owned()),
371                    thumbnail_source,
372                    thumbnail_info
373                });
374                let content = assign!(FileMessageEventContent::new(body, $source), {
375                    info: Some(Box::new(info)),
376                    formatted,
377                    filename,
378                });
379                <$t>::File(content)
380            }
381        }
382    }};
383}
384
385impl Room {
386    /// Create a new `Room`
387    ///
388    /// # Arguments
389    /// * `client` - The client used to make requests.
390    ///
391    /// * `room` - The underlying room.
392    pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
393        Self { inner: room, client }
394    }
395
396    /// Leave this room.
397    /// If the room was in [`RoomState::Invited`] state, it'll also be forgotten
398    /// automatically.
399    ///
400    /// Only invited and joined rooms can be left.
401    #[doc(alias = "reject_invitation")]
402    #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
403    async fn leave_impl(&self) -> (Result<()>, &Room) {
404        let state = self.state();
405        if state == RoomState::Left {
406            return (
407                Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
408                    "Joined or Invited",
409                    state,
410                )))),
411                self,
412            );
413        }
414
415        // If the room was in Invited state we should also forget it when declining the
416        // invite.
417        let should_forget = matches!(self.state(), RoomState::Invited);
418
419        let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
420        let response = self.client.send(request).await;
421
422        // The server can return with an error that is acceptable to ignore. Let's find
423        // which one.
424        if let Err(error) = response {
425            #[allow(clippy::collapsible_match)]
426            let ignore_error = if let Some(error) = error.client_api_error_kind() {
427                match error {
428                    // The user is trying to leave a room but doesn't have permissions to do so.
429                    // Let's consider the user has left the room.
430                    ErrorKind::Forbidden { .. } => true,
431                    _ => false,
432                }
433            } else {
434                false
435            };
436
437            error!(?error, ignore_error, should_forget, "Failed to leave the room");
438
439            if !ignore_error {
440                return (Err(error.into()), self);
441            }
442        }
443
444        if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
445            return (Err(e.into()), self);
446        }
447
448        if should_forget {
449            trace!("Trying to forget the room");
450
451            if let Err(error) = self.forget().await {
452                error!(?error, "Failed to forget the room");
453            }
454        }
455
456        (Ok(()), self)
457    }
458
459    /// Leave this room and all predecessors.
460    /// If any room was in [`RoomState::Invited`] state, it'll also be forgotten
461    /// automatically.
462    ///
463    /// Only invited and joined rooms can be left.
464    /// Will return an error if the current room fails to leave but
465    /// will only warn if a predecessor fails to leave.
466    pub async fn leave(&self) -> Result<()> {
467        let mut rooms: Vec<Room> = vec![self.clone()];
468        let mut current_room = self;
469
470        while let Some(predecessor) = current_room.predecessor_room() {
471            let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
472
473            if let Some(predecessor_room) = maybe_predecessor_room {
474                rooms.push(predecessor_room.clone());
475                current_room = rooms.last().expect("Room just pushed so can't be empty");
476            } else {
477                warn!("Cannot find predecessor room");
478                break;
479            }
480        }
481
482        let batch_size = 5;
483
484        let rooms_futures: Vec<_> = rooms
485            .iter()
486            .filter_map(|room| match room.state() {
487                RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
488                    Some(room.leave_impl())
489                }
490                RoomState::Banned | RoomState::Left => None,
491            })
492            .collect();
493
494        let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
495
496        let mut maybe_this_room_failed_with: Option<Error> = None;
497
498        while let Some(result) = futures_stream.next().await {
499            if let (Err(e), room) = result {
500                if room.room_id() == self.room_id() {
501                    maybe_this_room_failed_with = Some(e);
502                } else {
503                    warn!("Failure while attempting to leave predecessor room: {e:?}");
504                }
505            }
506        }
507
508        maybe_this_room_failed_with.map_or(Ok(()), Err)
509    }
510
511    /// Join this room.
512    ///
513    /// Only invited and left rooms can be joined via this method.
514    #[doc(alias = "accept_invitation")]
515    pub async fn join(&self) -> Result<()> {
516        let prev_room_state = self.inner.state();
517
518        if prev_room_state == RoomState::Joined {
519            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
520                "Invited or Left",
521                prev_room_state,
522            ))));
523        }
524
525        self.client.join_room_by_id(self.room_id()).await?;
526
527        Ok(())
528    }
529
530    /// Get the inner client saved in this room instance.
531    ///
532    /// Returns the client this room is part of.
533    pub fn client(&self) -> Client {
534        self.client.clone()
535    }
536
537    /// Get the sync state of this room, i.e. whether it was fully synced with
538    /// the server.
539    pub fn is_synced(&self) -> bool {
540        self.inner.is_state_fully_synced()
541    }
542
543    /// Gets the avatar of this room, if set.
544    ///
545    /// Returns the avatar.
546    /// If a thumbnail is requested no guarantee on the size of the image is
547    /// given.
548    ///
549    /// # Arguments
550    ///
551    /// * `format` - The desired format of the avatar.
552    ///
553    /// # Examples
554    ///
555    /// ```no_run
556    /// # use matrix_sdk::Client;
557    /// # use matrix_sdk::ruma::room_id;
558    /// # use matrix_sdk::media::MediaFormat;
559    /// # use url::Url;
560    /// # let homeserver = Url::parse("http://example.com").unwrap();
561    /// # async {
562    /// # let user = "example";
563    /// let client = Client::new(homeserver).await.unwrap();
564    /// client.matrix_auth().login_username(user, "password").send().await.unwrap();
565    /// let room_id = room_id!("!roomid:example.com");
566    /// let room = client.get_room(&room_id).unwrap();
567    /// if let Some(avatar) = room.avatar(MediaFormat::File).await.unwrap() {
568    ///     std::fs::write("avatar.png", avatar);
569    /// }
570    /// # };
571    /// ```
572    pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
573        let Some(url) = self.avatar_url() else { return Ok(None) };
574        let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
575        Ok(Some(self.client.media().get_media_content(&request, true).await?))
576    }
577
578    /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
579    /// returns a `Messages` struct that contains a chunk of room and state
580    /// events (`RoomEvent` and `AnyStateEvent`).
581    ///
582    /// With the encryption feature, messages are decrypted if possible. If
583    /// decryption fails for an individual message, that message is returned
584    /// undecrypted.
585    ///
586    /// # Examples
587    ///
588    /// ```no_run
589    /// use matrix_sdk::{Client, room::MessagesOptions};
590    /// # use matrix_sdk::ruma::{
591    /// #     api::client::filter::RoomEventFilter,
592    /// #     room_id,
593    /// # };
594    /// # use url::Url;
595    ///
596    /// # let homeserver = Url::parse("http://example.com").unwrap();
597    /// # async {
598    /// let options =
599    ///     MessagesOptions::backward().from("t47429-4392820_219380_26003_2265");
600    ///
601    /// let mut client = Client::new(homeserver).await.unwrap();
602    /// let room = client.get_room(room_id!("!roomid:example.com")).unwrap();
603    /// assert!(room.messages(options).await.is_ok());
604    /// # };
605    /// ```
606    #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
607    pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
608        let room_id = self.inner.room_id();
609        let request = options.into_request(room_id);
610        let http_response = self.client.send(request).await?;
611
612        let push_ctx = self.push_context().await?;
613        let chunk = join_all(
614            http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
615        )
616        .await;
617
618        Ok(Messages {
619            start: http_response.start,
620            end: http_response.end,
621            chunk,
622            state: http_response.state,
623        })
624    }
625
626    /// Register a handler for events of a specific type, within this room.
627    ///
628    /// This method works the same way as [`Client::add_event_handler`], except
629    /// that the handler will only be called for events within this room. See
630    /// that method for more details on event handler functions.
631    ///
632    /// `room.add_event_handler(hdl)` is equivalent to
633    /// `client.add_room_event_handler(room_id, hdl)`. Use whichever one is more
634    /// convenient in your use case.
635    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
636    where
637        Ev: SyncEvent + DeserializeOwned + Send + 'static,
638        H: EventHandler<Ev, Ctx>,
639    {
640        self.client.add_room_event_handler(self.room_id(), handler)
641    }
642
643    /// Subscribe to all updates for this room.
644    ///
645    /// The returned receiver will receive a new message for each sync response
646    /// that contains updates for this room.
647    pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
648        self.client.subscribe_to_room_updates(self.room_id())
649    }
650
651    /// Subscribe to typing notifications for this room.
652    ///
653    /// The returned receiver will receive a new vector of user IDs for each
654    /// sync response that contains 'm.typing' event. The current user ID will
655    /// be filtered out.
656    pub fn subscribe_to_typing_notifications(
657        &self,
658    ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
659        let (sender, receiver) = broadcast::channel(16);
660        let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
661            let own_user_id = self.own_user_id().to_owned();
662            move |event: SyncTypingEvent| async move {
663                // Ignore typing notifications from own user.
664                let typing_user_ids = event
665                    .content
666                    .user_ids
667                    .into_iter()
668                    .filter(|user_id| *user_id != own_user_id)
669                    .collect();
670                // Ignore the result. It can only fail if there are no listeners.
671                let _ = sender.send(typing_user_ids);
672            }
673        });
674        let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
675        (drop_guard, receiver)
676    }
677
678    /// Subscribe to updates about users who are in "pin violation" i.e. their
679    /// identity has changed and the user has not yet acknowledged this.
680    ///
681    /// The returned receiver will receive a new vector of
682    /// [`IdentityStatusChange`] each time a /keys/query response shows a
683    /// changed identity for a member of this room, or a sync shows a change
684    /// to the membership of an affected user. (Changes to the current user are
685    /// not directly included, but some changes to the current user's identity
686    /// can trigger changes to how we see other users' identities, which
687    /// will be included.)
688    ///
689    /// The first item in the stream provides the current state of the room:
690    /// each member of the room who is not in "pinned" or "verified" state will
691    /// be included (except the current user).
692    ///
693    /// If the `changed_to` property of an [`IdentityStatusChange`] is set to
694    /// `PinViolation` then a warning should be displayed to the user. If it is
695    /// set to `Pinned` then no warning should be displayed.
696    ///
697    /// Note that if a user who is in pin violation leaves the room, a `Pinned`
698    /// update is sent, to indicate that the warning should be removed, even
699    /// though the user's identity is not necessarily pinned.
700    #[cfg(feature = "e2e-encryption")]
701    pub async fn subscribe_to_identity_status_changes(
702        &self,
703    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
704        IdentityStatusChanges::create_stream(self.clone()).await
705    }
706
707    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
708    /// decrypted if needs be.
709    ///
710    /// Only logs from the crypto crate will indicate a failure to decrypt.
711    #[cfg(not(feature = "experimental-encrypted-state-events"))]
712    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
713    async fn try_decrypt_event(
714        &self,
715        event: Raw<AnyTimelineEvent>,
716        push_ctx: Option<&PushContext>,
717    ) -> TimelineEvent {
718        #[cfg(feature = "e2e-encryption")]
719        if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
720            SyncMessageLikeEvent::Original(_),
721        ))) = event.deserialize_as::<AnySyncTimelineEvent>()
722            && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
723        {
724            return event;
725        }
726
727        let mut event = TimelineEvent::from_plaintext(event.cast());
728        if let Some(push_ctx) = push_ctx {
729            event.set_push_actions(push_ctx.for_event(event.raw()).await);
730        }
731
732        event
733    }
734
735    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
736    /// decrypted if needs be.
737    ///
738    /// Only logs from the crypto crate will indicate a failure to decrypt.
739    #[cfg(feature = "experimental-encrypted-state-events")]
740    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
741    async fn try_decrypt_event(
742        &self,
743        event: Raw<AnyTimelineEvent>,
744        push_ctx: Option<&PushContext>,
745    ) -> TimelineEvent {
746        // If we have either an encrypted message-like or state event, try to decrypt.
747        match event.deserialize_as::<AnySyncTimelineEvent>() {
748            Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
749                SyncMessageLikeEvent::Original(_),
750            ))) => {
751                if let Ok(event) = self
752                    .decrypt_event(
753                        event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
754                        push_ctx,
755                    )
756                    .await
757                {
758                    return event;
759                }
760            }
761            Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
762                SyncStateEvent::Original(_),
763            ))) => {
764                if let Ok(event) = self
765                    .decrypt_event(
766                        event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
767                        push_ctx,
768                    )
769                    .await
770                {
771                    return event;
772                }
773            }
774            _ => {}
775        }
776
777        let mut event = TimelineEvent::from_plaintext(event.cast());
778        if let Some(push_ctx) = push_ctx {
779            event.set_push_actions(push_ctx.for_event(event.raw()).await);
780        }
781
782        event
783    }
784
785    /// Fetch the event with the given `EventId` in this room.
786    ///
787    /// It uses the given [`RequestConfig`] if provided, or the client's default
788    /// one otherwise.
789    pub async fn event(
790        &self,
791        event_id: &EventId,
792        request_config: Option<RequestConfig>,
793    ) -> Result<TimelineEvent> {
794        let request =
795            get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
796
797        let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
798        let push_ctx = self.push_context().await?;
799        let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
800
801        // Save the event into the event cache, if it's set up.
802        if let Ok((cache, _handles)) = self.event_cache().await {
803            cache.save_events([event.clone()]).await;
804        }
805
806        Ok(event)
807    }
808
809    /// Try to load the event from the [`EventCache`][crate::event_cache], if
810    /// it's enabled, or fetch it from the homeserver.
811    ///
812    /// When running the request against the homeserver, it uses the given
813    /// [`RequestConfig`] if provided, or the client's default one
814    /// otherwise.
815    pub async fn load_or_fetch_event(
816        &self,
817        event_id: &EventId,
818        request_config: Option<RequestConfig>,
819    ) -> Result<TimelineEvent> {
820        match self.event_cache().await {
821            Ok((event_cache, _drop_handles)) => {
822                if let Some(event) = event_cache.find_event(event_id).await? {
823                    return Ok(event);
824                }
825                // Fallthrough: try with a request.
826            }
827            Err(err) => {
828                debug!("error when getting the event cache: {err}");
829            }
830        }
831        self.event(event_id, request_config).await
832    }
833
834    /// Try to load the event and its relations from the
835    /// [`EventCache`][crate::event_cache], if it's enabled, or fetch it
836    /// from the homeserver.
837    ///
838    /// You can control which types of related events are retrieved using
839    /// `filter`. A `None` value will retrieve any type of related event.
840    ///
841    /// If the event is found in the event cache, but we can't find any
842    /// relations for it there, then we will still attempt to fetch the
843    /// relations from the homeserver.
844    ///
845    /// When running any request against the homeserver, it uses the given
846    /// [`RequestConfig`] if provided, or the client's default one
847    /// otherwise.
848    ///
849    /// Returns a tuple formed of the event and a vector of its relations (that
850    /// can be empty).
851    pub async fn load_or_fetch_event_with_relations(
852        &self,
853        event_id: &EventId,
854        filter: Option<Vec<RelationType>>,
855        request_config: Option<RequestConfig>,
856    ) -> Result<(TimelineEvent, Vec<TimelineEvent>)> {
857        let fetch_relations = async || {
858            // If there's only a single filter, we can use a more efficient request,
859            // specialized on the filter type.
860            //
861            // Otherwise, we need to get all the relations:
862            // - either because no filters implies we fetch all relations,
863            // - or because there are multiple filters and we must filter out manually.
864            let include_relations = if let Some(filter) = &filter
865                && filter.len() == 1
866            {
867                IncludeRelations::RelationsOfType(filter[0].clone())
868            } else {
869                IncludeRelations::AllRelations
870            };
871
872            let mut opts = RelationsOptions {
873                include_relations,
874                recurse: true,
875                limit: Some(uint!(256)),
876                ..Default::default()
877            };
878
879            let mut events = Vec::new();
880            loop {
881                match self.relations(event_id.to_owned(), opts.clone()).await {
882                    Ok(relations) => {
883                        if let Some(filter) = filter.as_ref() {
884                            // Manually filter out the relation types we're interested in.
885                            events.extend(relations.chunk.into_iter().filter_map(|ev| {
886                                let (rel_type, _) = extract_relation(ev.raw())?;
887                                filter
888                                    .iter()
889                                    .any(|ruma_filter| ruma_filter == &rel_type)
890                                    .then_some(ev)
891                            }));
892                        } else {
893                            // No filter: include all events from the response.
894                            events.extend(relations.chunk);
895                        }
896
897                        if let Some(next_from) = relations.next_batch_token {
898                            opts.from = Some(next_from);
899                        } else {
900                            break events;
901                        }
902                    }
903
904                    Err(err) => {
905                        warn!(%event_id, "error when loading relations of pinned event from server: {err}");
906                        break events;
907                    }
908                }
909            }
910        };
911
912        // First, try to load the event *and* its relations from the event cache, all at
913        // once.
914        let event_cache = match self.event_cache().await {
915            Ok((event_cache, drop_handles)) => {
916                if let Some((event, mut relations)) =
917                    event_cache.find_event_with_relations(event_id, filter.clone()).await?
918                {
919                    if relations.is_empty() {
920                        // The event cache doesn't have any relations for this event, try to fetch
921                        // them from the server instead.
922                        relations = fetch_relations().await;
923                    }
924
925                    return Ok((event, relations));
926                }
927
928                // Otherwise, get the event from the server.
929                Some((event_cache, drop_handles))
930            }
931
932            Err(err) => {
933                debug!("error when getting the event cache: {err}");
934                // Fallthrough: try with a request.
935                None
936            }
937        };
938
939        // Fetch the event from the server. A failure here is fatal, as we must return
940        // the target event.
941        let event = self.event(event_id, request_config).await?;
942
943        // Try to get the relations from the event cache (if we have one).
944        if let Some((event_cache, _drop_handles)) = event_cache
945            && let Some(relations) =
946                event_cache.find_event_relations(event_id, filter.clone()).await.ok()
947            && !relations.is_empty()
948        {
949            return Ok((event, relations));
950        }
951
952        // We couldn't find the relations in the event cache; fetch them from the
953        // server.
954        Ok((event, fetch_relations().await))
955    }
956
957    /// Fetch the event with the given `EventId` in this room, using the
958    /// `/context` endpoint to get more information.
959    pub async fn event_with_context(
960        &self,
961        event_id: &EventId,
962        lazy_load_members: bool,
963        context_size: UInt,
964        request_config: Option<RequestConfig>,
965    ) -> Result<EventWithContextResponse> {
966        let mut request =
967            context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
968
969        request.limit = context_size;
970
971        if lazy_load_members {
972            request.filter.lazy_load_options =
973                LazyLoadOptions::Enabled { include_redundant_members: false };
974        }
975
976        let response = self.client.send(request).with_request_config(request_config).await?;
977
978        let push_ctx = self.push_context().await?;
979        let push_ctx = push_ctx.as_ref();
980        let target_event = if let Some(event) = response.event {
981            Some(self.try_decrypt_event(event, push_ctx).await)
982        } else {
983            None
984        };
985
986        // Note: the joined future will fail if any future failed, but
987        // [`Self::try_decrypt_event`] doesn't hard-fail when there's a
988        // decryption error, so we should prevent against most bad cases here.
989        let (events_before, events_after) = join!(
990            join_all(
991                response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
992            ),
993            join_all(
994                response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
995            ),
996        );
997
998        // Save the loaded events into the event cache, if it's set up.
999        if let Ok((cache, _handles)) = self.event_cache().await {
1000            let mut events_to_save: Vec<TimelineEvent> = Vec::new();
1001            if let Some(event) = &target_event {
1002                events_to_save.push(event.clone());
1003            }
1004
1005            for event in &events_before {
1006                events_to_save.push(event.clone());
1007            }
1008
1009            for event in &events_after {
1010                events_to_save.push(event.clone());
1011            }
1012
1013            cache.save_events(events_to_save).await;
1014        }
1015
1016        Ok(EventWithContextResponse {
1017            event: target_event,
1018            events_before,
1019            events_after,
1020            state: response.state,
1021            prev_batch_token: response.start,
1022            next_batch_token: response.end,
1023        })
1024    }
1025
1026    pub(crate) async fn request_members(&self) -> Result<()> {
1027        self.client
1028            .locks()
1029            .members_request_deduplicated_handler
1030            .run(self.room_id().to_owned(), async move {
1031                let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
1032                let response = self
1033                    .client
1034                    .send(request.clone())
1035                    .with_request_config(
1036                        // In some cases it can take longer than 30s to load:
1037                        // https://github.com/element-hq/synapse/issues/16872
1038                        RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
1039                    )
1040                    .await?;
1041
1042                // That's a large `Future`. Let's `Box::pin` to reduce its size on the stack.
1043                Box::pin(self.client.base_client().receive_all_members(
1044                    self.room_id(),
1045                    &request,
1046                    &response,
1047                ))
1048                .await?;
1049
1050                Ok(())
1051            })
1052            .await
1053    }
1054
1055    /// Request to update the encryption state for this room.
1056    ///
1057    /// It does nothing if the encryption state is already
1058    /// [`EncryptionState::Encrypted`] or [`EncryptionState::NotEncrypted`].
1059    pub async fn request_encryption_state(&self) -> Result<()> {
1060        if !self.inner.encryption_state().is_unknown() {
1061            return Ok(());
1062        }
1063
1064        self.client
1065            .locks()
1066            .encryption_state_deduplicated_handler
1067            .run(self.room_id().to_owned(), async move {
1068                // Request the event from the server.
1069                let request = get_state_event_for_key::v3::Request::new(
1070                    self.room_id().to_owned(),
1071                    StateEventType::RoomEncryption,
1072                    "".to_owned(),
1073                );
1074                let response = match self.client.send(request).await {
1075                    Ok(response) => Some(
1076                        response
1077                            .into_content()
1078                            .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
1079                    ),
1080                    Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
1081                    Err(err) => return Err(err.into()),
1082                };
1083
1084                let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
1085
1086                // Persist the event and the fact that we requested it from the server in
1087                // `RoomInfo`.
1088                let mut room_info = self.clone_info();
1089                room_info.mark_encryption_state_synced();
1090                room_info.set_encryption_event(response.clone());
1091                let mut changes = StateChanges::default();
1092                changes.add_room(room_info.clone());
1093
1094                self.client.state_store().save_changes(&changes).await?;
1095                self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
1096
1097                Ok(())
1098            })
1099            .await
1100    }
1101
1102    /// Check the encryption state of this room.
1103    ///
1104    /// If the result is [`EncryptionState::Unknown`], one might want to call
1105    /// [`Room::request_encryption_state`].
1106    pub fn encryption_state(&self) -> EncryptionState {
1107        self.inner.encryption_state()
1108    }
1109
1110    /// Force to update the encryption state by calling
1111    /// [`Room::request_encryption_state`], and then calling
1112    /// [`Room::encryption_state`].
1113    ///
1114    /// This method is useful to ensure the encryption state is up-to-date.
1115    pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
1116        self.request_encryption_state().await?;
1117
1118        Ok(self.encryption_state())
1119    }
1120
1121    /// Gets additional context info about the client crypto.
1122    #[cfg(feature = "e2e-encryption")]
1123    pub async fn crypto_context_info(&self) -> CryptoContextInfo {
1124        let encryption = self.client.encryption();
1125
1126        let this_device_is_verified = match encryption.get_own_device().await {
1127            Ok(Some(device)) => device.is_verified_with_cross_signing(),
1128
1129            // Should not happen, there will always be an own device
1130            _ => true,
1131        };
1132
1133        let backup_exists_on_server =
1134            encryption.backups().exists_on_server().await.unwrap_or(false);
1135
1136        CryptoContextInfo {
1137            device_creation_ts: encryption.device_creation_timestamp().await,
1138            this_device_is_verified,
1139            is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1140            backup_exists_on_server,
1141        }
1142    }
1143
1144    fn are_events_visible(&self) -> bool {
1145        if let RoomState::Invited = self.inner.state() {
1146            return matches!(
1147                self.inner.history_visibility_or_default(),
1148                HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1149            );
1150        }
1151
1152        true
1153    }
1154
1155    /// Sync the member list with the server.
1156    ///
1157    /// This method will de-duplicate requests if it is called multiple times in
1158    /// quick succession, in that case the return value will be `None`. This
1159    /// method does nothing if the members are already synced.
1160    pub async fn sync_members(&self) -> Result<()> {
1161        if !self.are_events_visible() {
1162            return Ok(());
1163        }
1164
1165        if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1166    }
1167
1168    /// Get a specific member of this room.
1169    ///
1170    /// *Note*: This method will fetch the members from the homeserver if the
1171    /// member list isn't synchronized due to member lazy loading. Because of
1172    /// that it might panic if it isn't run on a tokio thread.
1173    ///
1174    /// Use [get_member_no_sync()](#method.get_member_no_sync) if you want a
1175    /// method that doesn't do any requests.
1176    ///
1177    /// # Arguments
1178    ///
1179    /// * `user_id` - The ID of the user that should be fetched out of the
1180    ///   store.
1181    pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1182        self.sync_members().await?;
1183        self.get_member_no_sync(user_id).await
1184    }
1185
1186    /// Get a specific member of this room.
1187    ///
1188    /// *Note*: This method will not fetch the members from the homeserver if
1189    /// the member list isn't synchronized due to member lazy loading. Thus,
1190    /// members could be missing.
1191    ///
1192    /// Use [get_member()](#method.get_member) if you want to ensure to always
1193    /// have the full member list to chose from.
1194    ///
1195    /// # Arguments
1196    ///
1197    /// * `user_id` - The ID of the user that should be fetched out of the
1198    ///   store.
1199    pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1200        Ok(self
1201            .inner
1202            .get_member(user_id)
1203            .await?
1204            .map(|member| RoomMember::new(self.client.clone(), member)))
1205    }
1206
1207    /// Get members for this room, with the given memberships.
1208    ///
1209    /// *Note*: This method will fetch the members from the homeserver if the
1210    /// member list isn't synchronized due to member lazy loading. Because of
1211    /// that it might panic if it isn't run on a tokio thread.
1212    ///
1213    /// Use [members_no_sync()](#method.members_no_sync) if you want a
1214    /// method that doesn't do any requests.
1215    pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1216        self.sync_members().await?;
1217        self.members_no_sync(memberships).await
1218    }
1219
1220    /// Get members for this room, with the given memberships.
1221    ///
1222    /// *Note*: This method will not fetch the members from the homeserver if
1223    /// the member list isn't synchronized due to member lazy loading. Thus,
1224    /// members could be missing.
1225    ///
1226    /// Use [members()](#method.members) if you want to ensure to always get
1227    /// the full member list.
1228    pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1229        Ok(self
1230            .inner
1231            .members(memberships)
1232            .await?
1233            .into_iter()
1234            .map(|member| RoomMember::new(self.client.clone(), member))
1235            .collect())
1236    }
1237
1238    /// Sets the display name of the current user within this room.
1239    ///
1240    /// *Note*: This is different to [`crate::Account::set_display_name`] which
1241    /// updates the user's display name across all of their rooms.
1242    pub async fn set_own_member_display_name(
1243        &self,
1244        display_name: Option<String>,
1245    ) -> Result<send_state_event::v3::Response> {
1246        let user_id = self.own_user_id();
1247        let member_event =
1248            self.get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id).await?;
1249
1250        let Some(RawSyncOrStrippedState::Sync(raw_event)) = member_event else {
1251            return Err(Error::InsufficientData);
1252        };
1253
1254        let event = raw_event.deserialize()?;
1255
1256        let mut content = match event {
1257            SyncStateEvent::Original(original_event) => original_event.content,
1258            SyncStateEvent::Redacted(redacted_event) => {
1259                RoomMemberEventContent::new(redacted_event.content.membership)
1260            }
1261        };
1262
1263        content.displayname = display_name;
1264        self.send_state_event_for_key(user_id, content).await
1265    }
1266
1267    /// Get all state events of a given type in this room.
1268    pub async fn get_state_events(
1269        &self,
1270        event_type: StateEventType,
1271    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1272        self.client
1273            .state_store()
1274            .get_state_events(self.room_id(), event_type)
1275            .await
1276            .map_err(Into::into)
1277    }
1278
1279    /// Get all state events of a given statically-known type in this room.
1280    ///
1281    /// # Examples
1282    ///
1283    /// ```no_run
1284    /// # async {
1285    /// # let room: matrix_sdk::Room = todo!();
1286    /// use matrix_sdk::ruma::{
1287    ///     events::room::member::RoomMemberEventContent, serde::Raw,
1288    /// };
1289    ///
1290    /// let room_members =
1291    ///     room.get_state_events_static::<RoomMemberEventContent>().await?;
1292    /// # anyhow::Ok(())
1293    /// # };
1294    /// ```
1295    pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1296    where
1297        C: StaticEventContent<IsPrefix = ruma::events::False>
1298            + StaticStateEventContent
1299            + RedactContent,
1300        C::Redacted: RedactedStateEventContent,
1301    {
1302        Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1303    }
1304
1305    /// Get the state events of a given type with the given state keys in this
1306    /// room.
1307    pub async fn get_state_events_for_keys(
1308        &self,
1309        event_type: StateEventType,
1310        state_keys: &[&str],
1311    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1312        self.client
1313            .state_store()
1314            .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1315            .await
1316            .map_err(Into::into)
1317    }
1318
1319    /// Get the state events of a given statically-known type with the given
1320    /// state keys in this room.
1321    ///
1322    /// # Examples
1323    ///
1324    /// ```no_run
1325    /// # async {
1326    /// # let room: matrix_sdk::Room = todo!();
1327    /// # let user_ids: &[matrix_sdk::ruma::OwnedUserId] = &[];
1328    /// use matrix_sdk::ruma::events::room::member::RoomMemberEventContent;
1329    ///
1330    /// let room_members = room
1331    ///     .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
1332    ///         user_ids,
1333    ///     )
1334    ///     .await?;
1335    /// # anyhow::Ok(())
1336    /// # };
1337    /// ```
1338    pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1339        &self,
1340        state_keys: I,
1341    ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1342    where
1343        C: StaticEventContent<IsPrefix = ruma::events::False>
1344            + StaticStateEventContent
1345            + RedactContent,
1346        C::StateKey: Borrow<K>,
1347        C::Redacted: RedactedStateEventContent,
1348        K: AsRef<str> + Sized + Sync + 'a,
1349        I: IntoIterator<Item = &'a K> + Send,
1350        I::IntoIter: Send,
1351    {
1352        Ok(self
1353            .client
1354            .state_store()
1355            .get_state_events_for_keys_static(self.room_id(), state_keys)
1356            .await?)
1357    }
1358
1359    /// Get a specific state event in this room.
1360    pub async fn get_state_event(
1361        &self,
1362        event_type: StateEventType,
1363        state_key: &str,
1364    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1365        self.client
1366            .state_store()
1367            .get_state_event(self.room_id(), event_type, state_key)
1368            .await
1369            .map_err(Into::into)
1370    }
1371
1372    /// Get a specific state event of statically-known type with an empty state
1373    /// key in this room.
1374    ///
1375    /// # Examples
1376    ///
1377    /// ```no_run
1378    /// # async {
1379    /// # let room: matrix_sdk::Room = todo!();
1380    /// use matrix_sdk::ruma::events::room::power_levels::RoomPowerLevelsEventContent;
1381    ///
1382    /// let power_levels = room
1383    ///     .get_state_event_static::<RoomPowerLevelsEventContent>()
1384    ///     .await?
1385    ///     .expect("every room has a power_levels event")
1386    ///     .deserialize()?;
1387    /// # anyhow::Ok(())
1388    /// # };
1389    /// ```
1390    pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1391    where
1392        C: StaticEventContent<IsPrefix = ruma::events::False>
1393            + StaticStateEventContent<StateKey = EmptyStateKey>
1394            + RedactContent,
1395        C::Redacted: RedactedStateEventContent,
1396    {
1397        self.get_state_event_static_for_key(&EmptyStateKey).await
1398    }
1399
1400    /// Get a specific state event of statically-known type in this room.
1401    ///
1402    /// # Examples
1403    ///
1404    /// ```no_run
1405    /// # async {
1406    /// # let room: matrix_sdk::Room = todo!();
1407    /// use matrix_sdk::ruma::{
1408    ///     events::room::member::RoomMemberEventContent, serde::Raw, user_id,
1409    /// };
1410    ///
1411    /// let member_event = room
1412    ///     .get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id!(
1413    ///         "@alice:example.org"
1414    ///     ))
1415    ///     .await?;
1416    /// # anyhow::Ok(())
1417    /// # };
1418    /// ```
1419    pub async fn get_state_event_static_for_key<C, K>(
1420        &self,
1421        state_key: &K,
1422    ) -> Result<Option<RawSyncOrStrippedState<C>>>
1423    where
1424        C: StaticEventContent<IsPrefix = ruma::events::False>
1425            + StaticStateEventContent
1426            + RedactContent,
1427        C::StateKey: Borrow<K>,
1428        C::Redacted: RedactedStateEventContent,
1429        K: AsRef<str> + ?Sized + Sync,
1430    {
1431        Ok(self
1432            .client
1433            .state_store()
1434            .get_state_event_static_for_key(self.room_id(), state_key)
1435            .await?)
1436    }
1437
1438    /// Returns the parents this room advertises as its parents.
1439    ///
1440    /// Results are in no particular order.
1441    pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1442        // Implements this algorithm:
1443        // https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships
1444
1445        // Get all m.space.parent events for this room
1446        Ok(self
1447            .get_state_events_static::<SpaceParentEventContent>()
1448            .await?
1449            .into_iter()
1450            // Extract state key (ie. the parent's id) and sender
1451            .filter_map(|parent_event| match parent_event.deserialize() {
1452                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1453                    Some((e.state_key.to_owned(), e.sender))
1454                }
1455                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1456                Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1457                Err(e) => {
1458                    info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1459                    None
1460                }
1461            })
1462            // Check whether the parent recognizes this room as its child
1463            .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1464                let Some(parent_room) = self.client.get_room(&state_key) else {
1465                    // We are not in the room, cannot check if the relationship is reciprocal
1466                    // TODO: try peeking into the room
1467                    return Ok(ParentSpace::Unverifiable(state_key));
1468                };
1469                // Get the m.space.child state of the parent with this room's id
1470                // as state key.
1471                if let Some(child_event) = parent_room
1472                    .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1473                    .await?
1474                {
1475                    match child_event.deserialize() {
1476                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1477                            // There is a valid m.space.child in the parent pointing to
1478                            // this room
1479                            return Ok(ParentSpace::Reciprocal(parent_room));
1480                        }
1481                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1482                        Ok(SyncOrStrippedState::Stripped(_)) => {}
1483                        Err(e) => {
1484                            info!(
1485                                room_id = ?self.room_id(), parent_room_id = ?state_key,
1486                                "Could not deserialize m.space.child: {e}"
1487                            );
1488                        }
1489                    }
1490                    // Otherwise the event is either invalid or redacted. If
1491                    // redacted it would be missing the
1492                    // `via` key, thereby invalidating that end of the
1493                    // relationship: https://spec.matrix.org/v1.8/client-server-api/#mspacechild
1494                }
1495
1496                // No reciprocal m.space.child found, let's check if the sender has the
1497                // power to set it
1498                let Some(member) = parent_room.get_member(&sender).await? else {
1499                    // Sender is not even in the parent room
1500                    return Ok(ParentSpace::Illegitimate(parent_room));
1501                };
1502
1503                if member.can_send_state(StateEventType::SpaceChild) {
1504                    // Sender does have the power to set m.room.child
1505                    Ok(ParentSpace::WithPowerlevel(parent_room))
1506                } else {
1507                    Ok(ParentSpace::Illegitimate(parent_room))
1508                }
1509            })
1510            .collect::<FuturesUnordered<_>>())
1511    }
1512
1513    /// Read account data in this room, from storage.
1514    pub async fn account_data(
1515        &self,
1516        data_type: RoomAccountDataEventType,
1517    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1518        self.client
1519            .state_store()
1520            .get_room_account_data_event(self.room_id(), data_type)
1521            .await
1522            .map_err(Into::into)
1523    }
1524
1525    /// Get account data of a statically-known type in this room, from storage.
1526    ///
1527    /// # Examples
1528    ///
1529    /// ```no_run
1530    /// # async {
1531    /// # let room: matrix_sdk::Room = todo!();
1532    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1533    ///
1534    /// match room.account_data_static::<FullyReadEventContent>().await? {
1535    ///     Some(fully_read) => {
1536    ///         println!("Found read marker: {:?}", fully_read.deserialize()?)
1537    ///     }
1538    ///     None => println!("No read marker for this room"),
1539    /// }
1540    /// # anyhow::Ok(())
1541    /// # };
1542    /// ```
1543    pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1544    where
1545        C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1546    {
1547        Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1548    }
1549
1550    /// Check if all members of this room are verified and all their devices are
1551    /// verified.
1552    ///
1553    /// Returns true if all devices in the room are verified, otherwise false.
1554    #[cfg(feature = "e2e-encryption")]
1555    pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1556        let user_ids = self
1557            .client
1558            .state_store()
1559            .get_user_ids(self.room_id(), RoomMemberships::empty())
1560            .await?;
1561
1562        for user_id in user_ids {
1563            let devices = self.client.encryption().get_user_devices(&user_id).await?;
1564            let any_unverified = devices.devices().any(|d| !d.is_verified());
1565
1566            if any_unverified {
1567                return Ok(false);
1568            }
1569        }
1570
1571        Ok(true)
1572    }
1573
1574    /// Set the given account data event for this room.
1575    ///
1576    /// # Example
1577    /// ```
1578    /// # async {
1579    /// # let room: matrix_sdk::Room = todo!();
1580    /// # let event_id: ruma::OwnedEventId = todo!();
1581    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1582    /// let content = FullyReadEventContent::new(event_id);
1583    ///
1584    /// room.set_account_data(content).await?;
1585    /// # anyhow::Ok(())
1586    /// # };
1587    /// ```
1588    pub async fn set_account_data<T>(
1589        &self,
1590        content: T,
1591    ) -> Result<set_room_account_data::v3::Response>
1592    where
1593        T: RoomAccountDataEventContent,
1594    {
1595        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1596
1597        let request = set_room_account_data::v3::Request::new(
1598            own_user.to_owned(),
1599            self.room_id().to_owned(),
1600            &content,
1601        )?;
1602
1603        Ok(self.client.send(request).await?)
1604    }
1605
1606    /// Set the given raw account data event in this room.
1607    ///
1608    /// # Example
1609    /// ```
1610    /// # async {
1611    /// # let room: matrix_sdk::Room = todo!();
1612    /// use matrix_sdk::ruma::{
1613    ///     events::{
1614    ///         AnyRoomAccountDataEventContent, RoomAccountDataEventContent,
1615    ///         marked_unread::MarkedUnreadEventContent,
1616    ///     },
1617    ///     serde::Raw,
1618    /// };
1619    /// let marked_unread_content = MarkedUnreadEventContent::new(true);
1620    /// let full_event: AnyRoomAccountDataEventContent =
1621    ///     marked_unread_content.clone().into();
1622    /// room.set_account_data_raw(
1623    ///     marked_unread_content.event_type(),
1624    ///     Raw::new(&full_event).unwrap(),
1625    /// )
1626    /// .await?;
1627    /// # anyhow::Ok(())
1628    /// # };
1629    /// ```
1630    pub async fn set_account_data_raw(
1631        &self,
1632        event_type: RoomAccountDataEventType,
1633        content: Raw<AnyRoomAccountDataEventContent>,
1634    ) -> Result<set_room_account_data::v3::Response> {
1635        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1636
1637        let request = set_room_account_data::v3::Request::new_raw(
1638            own_user.to_owned(),
1639            self.room_id().to_owned(),
1640            event_type,
1641            content,
1642        );
1643
1644        Ok(self.client.send(request).await?)
1645    }
1646
1647    /// Adds a tag to the room, or updates it if it already exists.
1648    ///
1649    /// Returns the [`create_tag::v3::Response`] from the server.
1650    ///
1651    /// # Arguments
1652    /// * `tag` - The tag to add or update.
1653    ///
1654    /// * `tag_info` - Information about the tag, generally containing the
1655    ///   `order` parameter.
1656    ///
1657    /// # Examples
1658    ///
1659    /// ```no_run
1660    /// # use std::str::FromStr;
1661    /// # use ruma::events::tag::{TagInfo, TagName, UserTagName};
1662    /// # async {
1663    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
1664    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
1665    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
1666    /// use matrix_sdk::ruma::events::tag::TagInfo;
1667    ///
1668    /// if let Some(room) = client.get_room(&room_id) {
1669    ///     let mut tag_info = TagInfo::new();
1670    ///     tag_info.order = Some(0.9);
1671    ///     let user_tag = UserTagName::from_str("u.work")?;
1672    ///
1673    ///     room.set_tag(TagName::User(user_tag), tag_info).await?;
1674    /// }
1675    /// # anyhow::Ok(()) };
1676    /// ```
1677    pub async fn set_tag(
1678        &self,
1679        tag: TagName,
1680        tag_info: TagInfo,
1681    ) -> Result<create_tag::v3::Response> {
1682        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1683        let request = create_tag::v3::Request::new(
1684            user_id.to_owned(),
1685            self.inner.room_id().to_owned(),
1686            tag.to_string(),
1687            tag_info,
1688        );
1689        Ok(self.client.send(request).await?)
1690    }
1691
1692    /// Removes a tag from the room.
1693    ///
1694    /// Returns the [`delete_tag::v3::Response`] from the server.
1695    ///
1696    /// # Arguments
1697    /// * `tag` - The tag to remove.
1698    pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1699        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1700        let request = delete_tag::v3::Request::new(
1701            user_id.to_owned(),
1702            self.inner.room_id().to_owned(),
1703            tag.to_string(),
1704        );
1705        Ok(self.client.send(request).await?)
1706    }
1707
1708    /// Add or remove the `m.favourite` flag for this room.
1709    ///
1710    /// If `is_favourite` is `true`, and the `m.low_priority` tag is set on the
1711    /// room, the tag will be removed too.
1712    ///
1713    /// # Arguments
1714    ///
1715    /// * `is_favourite` - Whether to mark this room as favourite.
1716    /// * `tag_order` - The order of the tag if any.
1717    pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1718        if is_favourite {
1719            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1720
1721            self.set_tag(TagName::Favorite, tag_info).await?;
1722
1723            if self.is_low_priority() {
1724                self.remove_tag(TagName::LowPriority).await?;
1725            }
1726        } else {
1727            self.remove_tag(TagName::Favorite).await?;
1728        }
1729        Ok(())
1730    }
1731
1732    /// Add or remove the `m.lowpriority` flag for this room.
1733    ///
1734    /// If `is_low_priority` is `true`, and the `m.favourite` tag is set on the
1735    /// room, the tag will be removed too.
1736    ///
1737    /// # Arguments
1738    ///
1739    /// * `is_low_priority` - Whether to mark this room as low_priority or not.
1740    /// * `tag_order` - The order of the tag if any.
1741    pub async fn set_is_low_priority(
1742        &self,
1743        is_low_priority: bool,
1744        tag_order: Option<f64>,
1745    ) -> Result<()> {
1746        if is_low_priority {
1747            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1748
1749            self.set_tag(TagName::LowPriority, tag_info).await?;
1750
1751            if self.is_favourite() {
1752                self.remove_tag(TagName::Favorite).await?;
1753            }
1754        } else {
1755            self.remove_tag(TagName::LowPriority).await?;
1756        }
1757        Ok(())
1758    }
1759
1760    /// Sets whether this room is a DM.
1761    ///
1762    /// When setting this room as DM, it will be marked as DM for all active
1763    /// members of the room. When unsetting this room as DM, it will be
1764    /// unmarked as DM for all users, not just the members.
1765    ///
1766    /// # Arguments
1767    /// * `is_direct` - Whether to mark this room as direct.
1768    pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1769        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1770
1771        let mut content = self
1772            .client
1773            .account()
1774            .account_data::<DirectEventContent>()
1775            .await?
1776            .map(|c| c.deserialize())
1777            .transpose()?
1778            .unwrap_or_default();
1779
1780        let this_room_id = self.inner.room_id();
1781
1782        if is_direct {
1783            let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1784            room_members.retain(|member| member.user_id() != self.own_user_id());
1785
1786            for member in room_members {
1787                let entry = content.entry(member.user_id().into()).or_default();
1788                if !entry.iter().any(|room_id| room_id == this_room_id) {
1789                    entry.push(this_room_id.to_owned());
1790                }
1791            }
1792        } else {
1793            for (_, list) in content.iter_mut() {
1794                list.retain(|room_id| *room_id != this_room_id);
1795            }
1796
1797            // Remove user ids that don't have any room marked as DM
1798            content.retain(|_, list| !list.is_empty());
1799        }
1800
1801        let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1802
1803        self.client.send(request).await?;
1804        Ok(())
1805    }
1806
1807    /// Tries to decrypt a room event.
1808    ///
1809    /// # Arguments
1810    /// * `event` - The room event to be decrypted.
1811    ///
1812    /// Returns the decrypted event. In the case of a decryption error, returns
1813    /// a `TimelineEvent` representing the decryption error.
1814    #[cfg(feature = "e2e-encryption")]
1815    #[cfg(not(feature = "experimental-encrypted-state-events"))]
1816    pub async fn decrypt_event(
1817        &self,
1818        event: &Raw<OriginalSyncRoomEncryptedEvent>,
1819        push_ctx: Option<&PushContext>,
1820    ) -> Result<TimelineEvent> {
1821        let machine = self.client.olm_machine().await;
1822        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1823
1824        match machine
1825            .try_decrypt_room_event(
1826                event.cast_ref(),
1827                self.inner.room_id(),
1828                self.client.decryption_settings(),
1829            )
1830            .await?
1831        {
1832            RoomEventDecryptionResult::Decrypted(decrypted) => {
1833                let push_actions = if let Some(push_ctx) = push_ctx {
1834                    Some(push_ctx.for_event(&decrypted.event).await)
1835                } else {
1836                    None
1837                };
1838                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1839            }
1840            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1841                self.client
1842                    .encryption()
1843                    .backups()
1844                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1845                Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1846            }
1847        }
1848    }
1849
1850    /// Tries to decrypt a room event.
1851    ///
1852    /// # Arguments
1853    /// * `event` - The room event to be decrypted.
1854    ///
1855    /// Returns the decrypted event. In the case of a decryption error, returns
1856    /// a `TimelineEvent` representing the decryption error.
1857    #[cfg(feature = "experimental-encrypted-state-events")]
1858    pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1859        &self,
1860        event: &Raw<T>,
1861        push_ctx: Option<&PushContext>,
1862    ) -> Result<TimelineEvent> {
1863        let machine = self.client.olm_machine().await;
1864        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1865
1866        match machine
1867            .try_decrypt_room_event(
1868                event.cast_ref(),
1869                self.inner.room_id(),
1870                self.client.decryption_settings(),
1871            )
1872            .await?
1873        {
1874            RoomEventDecryptionResult::Decrypted(decrypted) => {
1875                let push_actions = if let Some(push_ctx) = push_ctx {
1876                    Some(push_ctx.for_event(&decrypted.event).await)
1877                } else {
1878                    None
1879                };
1880                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1881            }
1882            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1883                self.client
1884                    .encryption()
1885                    .backups()
1886                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1887                // Cast safety: Anything that can be cast to EncryptedEvent must be a timeline
1888                // event.
1889                Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1890            }
1891        }
1892    }
1893
1894    /// Fetches the [`EncryptionInfo`] for an event decrypted with the supplied
1895    /// session_id.
1896    ///
1897    /// This may be used when we receive an update for a session, and we want to
1898    /// reflect the changes in messages we have received that were encrypted
1899    /// with that session, e.g. to remove a warning shield because a device is
1900    /// now verified.
1901    ///
1902    /// # Arguments
1903    /// * `session_id` - The ID of the Megolm session to get information for.
1904    /// * `sender` - The (claimed) sender of the event where the session was
1905    ///   used.
1906    #[cfg(feature = "e2e-encryption")]
1907    pub async fn get_encryption_info(
1908        &self,
1909        session_id: &str,
1910        sender: &UserId,
1911    ) -> Option<Arc<EncryptionInfo>> {
1912        let machine = self.client.olm_machine().await;
1913        let machine = machine.as_ref()?;
1914        machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1915    }
1916
1917    /// Forces the currently active room key, which is used to encrypt messages,
1918    /// to be rotated.
1919    ///
1920    /// A new room key will be crated and shared with all the room members the
1921    /// next time a message will be sent. You don't have to call this method,
1922    /// room keys will be rotated automatically when necessary. This method is
1923    /// still useful for debugging purposes.
1924    ///
1925    /// For more info please take a look a the [`encryption`] module
1926    /// documentation.
1927    ///
1928    /// [`encryption`]: crate::encryption
1929    #[cfg(feature = "e2e-encryption")]
1930    pub async fn discard_room_key(&self) -> Result<()> {
1931        let machine = self.client.olm_machine().await;
1932        if let Some(machine) = machine.as_ref() {
1933            machine.discard_room_key(self.inner.room_id()).await?;
1934            Ok(())
1935        } else {
1936            Err(Error::NoOlmMachine)
1937        }
1938    }
1939
1940    /// Ban the user with `UserId` from this room.
1941    ///
1942    /// # Arguments
1943    ///
1944    /// * `user_id` - The user to ban with `UserId`.
1945    ///
1946    /// * `reason` - The reason for banning this user.
1947    #[instrument(skip_all)]
1948    pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1949        let request = assign!(
1950            ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1951            { reason: reason.map(ToOwned::to_owned) }
1952        );
1953        self.client.send(request).await?;
1954        Ok(())
1955    }
1956
1957    /// Unban the user with `UserId` from this room.
1958    ///
1959    /// # Arguments
1960    ///
1961    /// * `user_id` - The user to unban with `UserId`.
1962    ///
1963    /// * `reason` - The reason for unbanning this user.
1964    #[instrument(skip_all)]
1965    pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1966        let request = assign!(
1967            unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1968            { reason: reason.map(ToOwned::to_owned) }
1969        );
1970        self.client.send(request).await?;
1971        Ok(())
1972    }
1973
1974    /// Kick a user out of this room.
1975    ///
1976    /// # Arguments
1977    ///
1978    /// * `user_id` - The `UserId` of the user that should be kicked out of the
1979    ///   room.
1980    ///
1981    /// * `reason` - Optional reason why the room member is being kicked out.
1982    #[instrument(skip_all)]
1983    pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1984        let request = assign!(
1985            kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1986            { reason: reason.map(ToOwned::to_owned) }
1987        );
1988        self.client.send(request).await?;
1989        Ok(())
1990    }
1991
1992    /// Invite the specified user by `UserId` to this room.
1993    ///
1994    /// # Arguments
1995    ///
1996    /// * `user_id` - The `UserId` of the user to invite to the room.
1997    #[instrument(skip_all)]
1998    pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1999        #[cfg(feature = "e2e-encryption")]
2000        if self.client.inner.enable_share_history_on_invite {
2001            shared_room_history::share_room_history(self, user_id.to_owned()).await?;
2002        }
2003
2004        let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
2005        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2006        self.client.send(request).await?;
2007
2008        // Force a future room members reload before sending any event to prevent UTDs
2009        // that can happen when some event is sent after a room member has been invited
2010        // but before the /sync request could fetch the membership change event.
2011        self.mark_members_missing();
2012
2013        Ok(())
2014    }
2015
2016    /// Invite the specified user by third party id to this room.
2017    ///
2018    /// # Arguments
2019    ///
2020    /// * `invite_id` - A third party id of a user to invite to the room.
2021    #[instrument(skip_all)]
2022    pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
2023        let recipient = InvitationRecipient::ThirdPartyId(invite_id);
2024        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2025        self.client.send(request).await?;
2026
2027        // Force a future room members reload before sending any event to prevent UTDs
2028        // that can happen when some event is sent after a room member has been invited
2029        // but before the /sync request could fetch the membership change event.
2030        self.mark_members_missing();
2031
2032        Ok(())
2033    }
2034
2035    /// Activate typing notice for this room.
2036    ///
2037    /// The typing notice remains active for 4s. It can be deactivate at any
2038    /// point by setting typing to `false`. If this method is called while
2039    /// the typing notice is active nothing will happen. This method can be
2040    /// called on every key stroke, since it will do nothing while typing is
2041    /// active.
2042    ///
2043    /// # Arguments
2044    ///
2045    /// * `typing` - Whether the user is typing or has stopped typing.
2046    ///
2047    /// # Examples
2048    ///
2049    /// ```no_run
2050    /// use std::time::Duration;
2051    ///
2052    /// use matrix_sdk::ruma::api::client::typing::create_typing_event::v3::Typing;
2053    /// # use matrix_sdk::{
2054    /// #     Client, config::SyncSettings,
2055    /// #     ruma::room_id,
2056    /// # };
2057    /// # use url::Url;
2058    ///
2059    /// # async {
2060    /// # let homeserver = Url::parse("http://localhost:8080")?;
2061    /// # let client = Client::new(homeserver).await?;
2062    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2063    ///
2064    /// if let Some(room) = client.get_room(&room_id) {
2065    ///     room.typing_notice(true).await?
2066    /// }
2067    /// # anyhow::Ok(()) };
2068    /// ```
2069    pub async fn typing_notice(&self, typing: bool) -> Result<()> {
2070        self.ensure_room_joined()?;
2071
2072        // Only send a request to the homeserver if the old timeout has elapsed
2073        // or the typing notice changed state within the `TYPING_NOTICE_TIMEOUT`
2074        let send = if let Some(typing_time) =
2075            self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
2076        {
2077            if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
2078                // We always reactivate the typing notice if typing is true or
2079                // we may need to deactivate it if it's
2080                // currently active if typing is false
2081                typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
2082            } else {
2083                // Only send a request when we need to deactivate typing
2084                !typing
2085            }
2086        } else {
2087            // Typing notice is currently deactivated, therefore, send a request
2088            // only when it's about to be activated
2089            typing
2090        };
2091
2092        if send {
2093            self.send_typing_notice(typing).await?;
2094        }
2095
2096        Ok(())
2097    }
2098
2099    #[instrument(name = "typing_notice", skip(self))]
2100    async fn send_typing_notice(&self, typing: bool) -> Result<()> {
2101        let typing = if typing {
2102            self.client
2103                .inner
2104                .typing_notice_times
2105                .write()
2106                .unwrap()
2107                .insert(self.room_id().to_owned(), Instant::now());
2108            Typing::Yes(TYPING_NOTICE_TIMEOUT)
2109        } else {
2110            self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
2111            Typing::No
2112        };
2113
2114        let request = create_typing_event::v3::Request::new(
2115            self.own_user_id().to_owned(),
2116            self.room_id().to_owned(),
2117            typing,
2118        );
2119
2120        self.client.send(request).await?;
2121
2122        Ok(())
2123    }
2124
2125    /// Send a request to set a single receipt.
2126    ///
2127    /// If an unthreaded receipt is sent, this will also unset the unread flag
2128    /// of the room if necessary.
2129    ///
2130    /// # Arguments
2131    ///
2132    /// * `receipt_type` - The type of the receipt to set. Note that it is
2133    ///   possible to set the fully-read marker although it is technically not a
2134    ///   receipt.
2135    ///
2136    /// * `thread` - The thread where this receipt should apply, if any. Note
2137    ///   that this must be [`ReceiptThread::Unthreaded`] when sending a
2138    ///   [`ReceiptType::FullyRead`][create_receipt::v3::ReceiptType::FullyRead].
2139    ///
2140    /// * `event_id` - The `EventId` of the event to set the receipt on.
2141    #[instrument(skip_all)]
2142    pub async fn send_single_receipt(
2143        &self,
2144        receipt_type: create_receipt::v3::ReceiptType,
2145        thread: ReceiptThread,
2146        event_id: OwnedEventId,
2147    ) -> Result<()> {
2148        // Since the receipt type and the thread aren't Hash/Ord, flatten then as a
2149        // string key.
2150        let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
2151
2152        self.client
2153            .inner
2154            .locks
2155            .read_receipt_deduplicated_handler
2156            .run((request_key, event_id.clone()), async {
2157                // We will unset the unread flag if we send an unthreaded receipt.
2158                let is_unthreaded = thread == ReceiptThread::Unthreaded;
2159
2160                let mut request = create_receipt::v3::Request::new(
2161                    self.room_id().to_owned(),
2162                    receipt_type,
2163                    event_id,
2164                );
2165                request.thread = thread;
2166
2167                self.client.send(request).await?;
2168
2169                if is_unthreaded {
2170                    self.set_unread_flag(false).await?;
2171                }
2172
2173                Ok(())
2174            })
2175            .await
2176    }
2177
2178    /// Send a request to set multiple receipts at once.
2179    ///
2180    /// This will also unset the unread flag of the room if necessary.
2181    ///
2182    /// # Arguments
2183    ///
2184    /// * `receipts` - The `Receipts` to send.
2185    ///
2186    /// If `receipts` is empty, this is a no-op.
2187    #[instrument(skip_all)]
2188    pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2189        if receipts.is_empty() {
2190            return Ok(());
2191        }
2192
2193        let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2194        let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2195            fully_read,
2196            read_receipt: public_read_receipt,
2197            private_read_receipt,
2198        });
2199
2200        self.client.send(request).await?;
2201
2202        self.set_unread_flag(false).await?;
2203
2204        Ok(())
2205    }
2206
2207    /// Helper function to enable End-to-end encryption in this room.
2208    /// `encrypted_state_events` is not used unless the
2209    /// `experimental-encrypted-state-events` feature is enabled.
2210    #[allow(unused_variables, unused_mut)]
2211    async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2212        use ruma::{
2213            EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2214        };
2215        const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2216
2217        if !self.latest_encryption_state().await?.is_encrypted() {
2218            let mut content =
2219                RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2220            #[cfg(feature = "experimental-encrypted-state-events")]
2221            if encrypted_state_events {
2222                content = content.with_encrypted_state();
2223            }
2224            self.send_state_event(content).await?;
2225
2226            // Spin on the sync beat event, since the first sync we receive might not
2227            // include the encryption event.
2228            //
2229            // TODO do we want to return an error here if we time out? This
2230            // could be quite useful if someone wants to enable encryption and
2231            // send a message right after it's enabled.
2232            let res = timeout(
2233                async {
2234                    loop {
2235                        // Listen for sync events, then check if the encryption state is known.
2236                        self.client.inner.sync_beat.listen().await;
2237                        let _state_store_lock =
2238                            self.client.base_client().state_store_lock().lock().await;
2239
2240                        if !self.inner.encryption_state().is_unknown() {
2241                            break;
2242                        }
2243                    }
2244                },
2245                SYNC_WAIT_TIME,
2246            )
2247            .await;
2248
2249            let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2250
2251            // If encryption was enabled, return.
2252            #[cfg(not(feature = "experimental-encrypted-state-events"))]
2253            if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2254                debug!("room successfully marked as encrypted");
2255                return Ok(());
2256            }
2257
2258            // If encryption with state event encryption was enabled, return.
2259            #[cfg(feature = "experimental-encrypted-state-events")]
2260            if res.is_ok() && {
2261                if encrypted_state_events {
2262                    self.inner.encryption_state().is_state_encrypted()
2263                } else {
2264                    self.inner.encryption_state().is_encrypted()
2265                }
2266            } {
2267                debug!("room successfully marked as encrypted");
2268                return Ok(());
2269            }
2270
2271            // If after waiting for multiple syncs, we don't have the encryption state we
2272            // expect, assume the local encryption state is incorrect; this will
2273            // cause the SDK to re-request it later for confirmation, instead of
2274            // assuming it's sync'd and correct (and not encrypted).
2275            debug!("still not marked as encrypted, marking encryption state as missing");
2276
2277            let mut room_info = self.clone_info();
2278            room_info.mark_encryption_state_missing();
2279            let mut changes = StateChanges::default();
2280            changes.add_room(room_info.clone());
2281
2282            self.client.state_store().save_changes(&changes).await?;
2283            self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2284        }
2285
2286        Ok(())
2287    }
2288
2289    /// Enable End-to-end encryption in this room.
2290    ///
2291    /// This method will be a noop if encryption is already enabled, otherwise
2292    /// sends a `m.room.encryption` state event to the room. This might fail if
2293    /// you don't have the appropriate power level to enable end-to-end
2294    /// encryption.
2295    ///
2296    /// A sync needs to be received to update the local room state. This method
2297    /// will wait for a sync to be received, this might time out if no
2298    /// sync loop is running or if the server is slow.
2299    ///
2300    /// # Examples
2301    ///
2302    /// ```no_run
2303    /// # use matrix_sdk::{
2304    /// #     Client, config::SyncSettings,
2305    /// #     ruma::room_id,
2306    /// # };
2307    /// # use url::Url;
2308    /// #
2309    /// # async {
2310    /// # let homeserver = Url::parse("http://localhost:8080")?;
2311    /// # let client = Client::new(homeserver).await?;
2312    /// # let room_id = room_id!("!test:localhost");
2313    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2314    ///
2315    /// if let Some(room) = client.get_room(&room_id) {
2316    ///     room.enable_encryption().await?
2317    /// }
2318    /// # anyhow::Ok(()) };
2319    /// ```
2320    #[instrument(skip_all)]
2321    pub async fn enable_encryption(&self) -> Result<()> {
2322        self.enable_encryption_inner(false).await
2323    }
2324
2325    /// Enable End-to-end encryption in this room, opting into experimental
2326    /// state event encryption.
2327    ///
2328    /// This method will be a noop if encryption is already enabled, otherwise
2329    /// sends a `m.room.encryption` state event to the room. This might fail if
2330    /// you don't have the appropriate power level to enable end-to-end
2331    /// encryption.
2332    ///
2333    /// A sync needs to be received to update the local room state. This method
2334    /// will wait for a sync to be received, this might time out if no
2335    /// sync loop is running or if the server is slow.
2336    ///
2337    /// # Examples
2338    ///
2339    /// ```no_run
2340    /// # use matrix_sdk::{
2341    /// #     Client, config::SyncSettings,
2342    /// #     ruma::room_id,
2343    /// # };
2344    /// # use url::Url;
2345    /// #
2346    /// # async {
2347    /// # let homeserver = Url::parse("http://localhost:8080")?;
2348    /// # let client = Client::new(homeserver).await?;
2349    /// # let room_id = room_id!("!test:localhost");
2350    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2351    ///
2352    /// if let Some(room) = client.get_room(&room_id) {
2353    ///     room.enable_encryption_with_state_event_encryption().await?
2354    /// }
2355    /// # anyhow::Ok(()) };
2356    /// ```
2357    #[instrument(skip_all)]
2358    #[cfg(feature = "experimental-encrypted-state-events")]
2359    pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2360        self.enable_encryption_inner(true).await
2361    }
2362
2363    /// Share a room key with users in the given room.
2364    ///
2365    /// This will create Olm sessions with all the users/device pairs in the
2366    /// room if necessary and share a room key that can be shared with them.
2367    ///
2368    /// Does nothing if no room key needs to be shared.
2369    // TODO: expose this publicly so people can pre-share a group session if
2370    // e.g. a user starts to type a message for a room.
2371    #[cfg(feature = "e2e-encryption")]
2372    #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2373    async fn preshare_room_key(&self) -> Result<()> {
2374        self.ensure_room_joined()?;
2375
2376        // Take and release the lock on the store, if needs be.
2377        let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2378        tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2379
2380        self.client
2381            .locks()
2382            .group_session_deduplicated_handler
2383            .run(self.room_id().to_owned(), async move {
2384                {
2385                    let members = self
2386                        .client
2387                        .state_store()
2388                        .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2389                        .await?;
2390                    self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2391                };
2392
2393                let response = self.share_room_key().await;
2394
2395                // If one of the responses failed invalidate the group
2396                // session as using it would end up in undecryptable
2397                // messages.
2398                if let Err(r) = response {
2399                    let machine = self.client.olm_machine().await;
2400                    if let Some(machine) = machine.as_ref() {
2401                        machine.discard_room_key(self.room_id()).await?;
2402                    }
2403                    return Err(r);
2404                }
2405
2406                Ok(())
2407            })
2408            .await
2409    }
2410
2411    /// Share a group session for a room.
2412    ///
2413    /// # Panics
2414    ///
2415    /// Panics if the client isn't logged in.
2416    #[cfg(feature = "e2e-encryption")]
2417    #[instrument(skip_all)]
2418    async fn share_room_key(&self) -> Result<()> {
2419        self.ensure_room_joined()?;
2420
2421        let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2422
2423        for request in requests {
2424            let response = self.client.send_to_device(&request).await?;
2425            self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2426        }
2427
2428        Ok(())
2429    }
2430
2431    /// Wait for the room to be fully synced.
2432    ///
2433    /// This method makes sure the room that was returned when joining a room
2434    /// has been echoed back in the sync.
2435    ///
2436    /// Warning: This waits until a sync happens and does not return if no sync
2437    /// is happening. It can also return early when the room is not a joined
2438    /// room anymore.
2439    #[instrument(skip_all)]
2440    pub async fn sync_up(&self) {
2441        while !self.is_synced() && self.state() == RoomState::Joined {
2442            let wait_for_beat = self.client.inner.sync_beat.listen();
2443            // We don't care whether it's a timeout or a sync beat.
2444            let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2445        }
2446    }
2447
2448    /// Send a message-like event to this room.
2449    ///
2450    /// Returns the parsed response from the server.
2451    ///
2452    /// If the encryption feature is enabled this method will transparently
2453    /// encrypt the event if this room is encrypted (except for `m.reaction`
2454    /// events, which are never encrypted).
2455    ///
2456    /// **Note**: If you just want to send an event with custom JSON content to
2457    /// a room, you can use the [`send_raw()`][Self::send_raw] method for that.
2458    ///
2459    /// If you want to set a transaction ID for the event, use
2460    /// [`.with_transaction_id()`][SendMessageLikeEvent::with_transaction_id]
2461    /// on the returned value before `.await`ing it.
2462    ///
2463    /// # Arguments
2464    ///
2465    /// * `content` - The content of the message event.
2466    ///
2467    /// # Examples
2468    ///
2469    /// ```no_run
2470    /// # use std::sync::{Arc, RwLock};
2471    /// # use matrix_sdk::{Client, config::SyncSettings};
2472    /// # use url::Url;
2473    /// # use matrix_sdk::ruma::room_id;
2474    /// # use serde::{Deserialize, Serialize};
2475    /// use matrix_sdk::ruma::{
2476    ///     MilliSecondsSinceUnixEpoch, TransactionId,
2477    ///     events::{
2478    ///         macros::EventContent,
2479    ///         room::message::{RoomMessageEventContent, TextMessageEventContent},
2480    ///     },
2481    ///     uint,
2482    /// };
2483    ///
2484    /// # async {
2485    /// # let homeserver = Url::parse("http://localhost:8080")?;
2486    /// # let mut client = Client::new(homeserver).await?;
2487    /// # let room_id = room_id!("!test:localhost");
2488    /// let content = RoomMessageEventContent::text_plain("Hello world");
2489    /// let txn_id = TransactionId::new();
2490    ///
2491    /// if let Some(room) = client.get_room(&room_id) {
2492    ///     room.send(content).with_transaction_id(txn_id).await?;
2493    /// }
2494    ///
2495    /// // Custom events work too:
2496    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2497    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
2498    /// struct TokenEventContent {
2499    ///     token: String,
2500    ///     #[serde(rename = "exp")]
2501    ///     expires_at: MilliSecondsSinceUnixEpoch,
2502    /// }
2503    ///
2504    /// # fn generate_token() -> String { todo!() }
2505    /// let content = TokenEventContent {
2506    ///     token: generate_token(),
2507    ///     expires_at: {
2508    ///         let now = MilliSecondsSinceUnixEpoch::now();
2509    ///         MilliSecondsSinceUnixEpoch(now.0 + uint!(30_000))
2510    ///     },
2511    /// };
2512    ///
2513    /// if let Some(room) = client.get_room(&room_id) {
2514    ///     room.send(content).await?;
2515    /// }
2516    /// # anyhow::Ok(()) };
2517    /// ```
2518    pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2519        SendMessageLikeEvent::new(self, content)
2520    }
2521
2522    /// Run /keys/query requests for all the non-tracked users, and for users
2523    /// with an out-of-date device list.
2524    #[cfg(feature = "e2e-encryption")]
2525    async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2526        let olm = self.client.olm_machine().await;
2527        let olm = olm.as_ref().expect("Olm machine wasn't started");
2528
2529        let members =
2530            self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2531
2532        let tracked: HashMap<_, _> = olm
2533            .store()
2534            .load_tracked_users()
2535            .await?
2536            .into_iter()
2537            .map(|tracked| (tracked.user_id, tracked.dirty))
2538            .collect();
2539
2540        // A member has no unknown devices iff it was tracked *and* the tracking is
2541        // not considered dirty.
2542        let members_with_unknown_devices =
2543            members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2544
2545        let (req_id, request) =
2546            olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2547
2548        if !request.device_keys.is_empty() {
2549            self.client.keys_query(&req_id, request.device_keys).await?;
2550        }
2551
2552        Ok(())
2553    }
2554
2555    /// Send a message-like event with custom JSON content to this room.
2556    ///
2557    /// Returns the parsed response from the server.
2558    ///
2559    /// If the encryption feature is enabled this method will transparently
2560    /// encrypt the event if this room is encrypted (except for `m.reaction`
2561    /// events, which are never encrypted).
2562    ///
2563    /// This method is equivalent to the [`send()`][Self::send] method but
2564    /// allows sending custom JSON payloads, e.g. constructed using the
2565    /// [`serde_json::json!()`] macro.
2566    ///
2567    /// If you want to set a transaction ID for the event, use
2568    /// [`.with_transaction_id()`][SendRawMessageLikeEvent::with_transaction_id]
2569    /// on the returned value before `.await`ing it.
2570    ///
2571    /// # Arguments
2572    ///
2573    /// * `event_type` - The type of the event.
2574    ///
2575    /// * `content` - The content of the event as a raw JSON value. The argument
2576    ///   type can be `serde_json::Value`, but also other raw JSON types; for
2577    ///   the full list check the documentation of
2578    ///   [`IntoRawMessageLikeEventContent`].
2579    ///
2580    /// # Examples
2581    ///
2582    /// ```no_run
2583    /// # use std::sync::{Arc, RwLock};
2584    /// # use matrix_sdk::{Client, config::SyncSettings};
2585    /// # use url::Url;
2586    /// # use matrix_sdk::ruma::room_id;
2587    /// # async {
2588    /// # let homeserver = Url::parse("http://localhost:8080")?;
2589    /// # let mut client = Client::new(homeserver).await?;
2590    /// # let room_id = room_id!("!test:localhost");
2591    /// use serde_json::json;
2592    ///
2593    /// if let Some(room) = client.get_room(&room_id) {
2594    ///     room.send_raw("m.room.message", json!({ "body": "Hello world" })).await?;
2595    /// }
2596    /// # anyhow::Ok(()) };
2597    /// ```
2598    #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2599    pub fn send_raw<'a>(
2600        &'a self,
2601        event_type: &'a str,
2602        content: impl IntoRawMessageLikeEventContent,
2603    ) -> SendRawMessageLikeEvent<'a> {
2604        // Note: the recorded instrument fields are saved in
2605        // `SendRawMessageLikeEvent::into_future`.
2606        SendRawMessageLikeEvent::new(self, event_type, content)
2607    }
2608
2609    /// Send an attachment to this room.
2610    ///
2611    /// This will upload the given data that the reader produces using the
2612    /// [`upload()`] method and post an event to the given room.
2613    /// If the room is encrypted and the encryption feature is enabled the
2614    /// upload will be encrypted.
2615    ///
2616    /// This is a convenience method that calls the
2617    /// [`upload()`] and afterwards the [`send()`].
2618    ///
2619    /// # Arguments
2620    /// * `filename` - The file name.
2621    ///
2622    /// * `content_type` - The type of the media, this will be used as the
2623    /// content-type header.
2624    ///
2625    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2626    /// media.
2627    ///
2628    /// * `config` - Metadata and configuration for the attachment.
2629    ///
2630    /// # Examples
2631    ///
2632    /// ```no_run
2633    /// # use std::fs;
2634    /// # use matrix_sdk::{Client, ruma::room_id, attachment::AttachmentConfig};
2635    /// # use url::Url;
2636    /// # use mime;
2637    /// # async {
2638    /// # let homeserver = Url::parse("http://localhost:8080")?;
2639    /// # let mut client = Client::new(homeserver).await?;
2640    /// # let room_id = room_id!("!test:localhost");
2641    /// let mut image = fs::read("/home/example/my-cat.jpg")?;
2642    ///
2643    /// if let Some(room) = client.get_room(&room_id) {
2644    ///     room.send_attachment(
2645    ///         "my_favorite_cat.jpg",
2646    ///         &mime::IMAGE_JPEG,
2647    ///         image,
2648    ///         AttachmentConfig::new(),
2649    ///     ).await?;
2650    /// }
2651    /// # anyhow::Ok(()) };
2652    /// ```
2653    ///
2654    /// [`upload()`]: crate::Media::upload
2655    /// [`send()`]: Self::send
2656    #[instrument(skip_all)]
2657    pub fn send_attachment<'a>(
2658        &'a self,
2659        filename: impl Into<String>,
2660        content_type: &'a Mime,
2661        data: Vec<u8>,
2662        config: AttachmentConfig,
2663    ) -> SendAttachment<'a> {
2664        SendAttachment::new(self, filename.into(), content_type, data, config)
2665    }
2666
2667    /// Prepare and send an attachment to this room.
2668    ///
2669    /// This will upload the given data that the reader produces using the
2670    /// [`upload()`](#method.upload) method and post an event to the given room.
2671    /// If the room is encrypted and the encryption feature is enabled the
2672    /// upload will be encrypted.
2673    ///
2674    /// This is a convenience method that calls the
2675    /// [`Client::upload()`](#Client::method.upload) and afterwards the
2676    /// [`send()`](#method.send).
2677    ///
2678    /// # Arguments
2679    /// * `filename` - The file name.
2680    ///
2681    /// * `content_type` - The type of the media, this will be used as the
2682    ///   content-type header.
2683    ///
2684    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2685    ///   media.
2686    ///
2687    /// * `config` - Metadata and configuration for the attachment.
2688    ///
2689    /// * `send_progress` - An observable to transmit forward progress about the
2690    ///   upload.
2691    ///
2692    /// * `store_in_cache` - A boolean defining whether the uploaded media will
2693    ///   be stored in the cache immediately after a successful upload.
2694    #[instrument(skip_all)]
2695    pub(super) async fn prepare_and_send_attachment<'a>(
2696        &'a self,
2697        filename: String,
2698        content_type: &'a Mime,
2699        data: Vec<u8>,
2700        mut config: AttachmentConfig,
2701        send_progress: SharedObservable<TransmissionProgress>,
2702        store_in_cache: bool,
2703    ) -> Result<send_message_event::v3::Response> {
2704        self.ensure_room_joined()?;
2705
2706        let txn_id = config.txn_id.take();
2707        let mentions = config.mentions.take();
2708
2709        let thumbnail = config.thumbnail.take();
2710
2711        // If necessary, store caching data for the thumbnail ahead of time.
2712        let thumbnail_cache_info = if store_in_cache {
2713            thumbnail
2714                .as_ref()
2715                .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2716        } else {
2717            None
2718        };
2719
2720        #[cfg(feature = "e2e-encryption")]
2721        let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2722            self.client
2723                .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2724                .await?
2725        } else {
2726            self.client
2727                .media()
2728                .upload_plain_media_and_thumbnail(
2729                    content_type,
2730                    // TODO: get rid of this clone; wait for Ruma to use `Bytes` or something
2731                    // similar.
2732                    data.clone(),
2733                    thumbnail,
2734                    send_progress,
2735                )
2736                .await?
2737        };
2738
2739        #[cfg(not(feature = "e2e-encryption"))]
2740        let (media_source, thumbnail) = self
2741            .client
2742            .media()
2743            .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2744            .await?;
2745
2746        if store_in_cache {
2747            let media_store_lock_guard = self.client.media_store().lock().await?;
2748
2749            // A failure to cache shouldn't prevent the whole upload from finishing
2750            // properly, so only log errors during caching.
2751
2752            debug!("caching the media");
2753            let request =
2754                MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2755
2756            if let Err(err) = media_store_lock_guard
2757                .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2758                .await
2759            {
2760                warn!("unable to cache the media after uploading it: {err}");
2761            }
2762
2763            if let Some(((data, height, width), source)) =
2764                thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2765            {
2766                debug!("caching the thumbnail");
2767
2768                let request = MediaRequestParameters {
2769                    source: source.clone(),
2770                    format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2771                };
2772
2773                if let Err(err) = media_store_lock_guard
2774                    .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2775                    .await
2776                {
2777                    warn!("unable to cache the media after uploading it: {err}");
2778                }
2779            }
2780        }
2781
2782        let content = self
2783            .make_media_event(
2784                Room::make_attachment_type(
2785                    content_type,
2786                    filename,
2787                    media_source,
2788                    config.caption,
2789                    config.info,
2790                    thumbnail,
2791                ),
2792                mentions,
2793                config.reply,
2794            )
2795            .await?;
2796
2797        let mut fut = self.send(content);
2798        if let Some(txn_id) = txn_id {
2799            fut = fut.with_transaction_id(txn_id);
2800        }
2801
2802        fut.await.map(|result| result.response)
2803    }
2804
2805    /// Creates the inner [`MessageType`] for an already-uploaded media file
2806    /// provided by its source.
2807    #[allow(clippy::too_many_arguments)]
2808    pub(crate) fn make_attachment_type(
2809        content_type: &Mime,
2810        filename: String,
2811        source: MediaSource,
2812        caption: Option<TextMessageEventContent>,
2813        info: Option<AttachmentInfo>,
2814        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2815    ) -> MessageType {
2816        make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2817    }
2818
2819    /// Creates the [`RoomMessageEventContent`] based on the message type,
2820    /// mentions and reply information.
2821    pub(crate) async fn make_media_event(
2822        &self,
2823        msg_type: MessageType,
2824        mentions: Option<Mentions>,
2825        reply: Option<Reply>,
2826    ) -> Result<RoomMessageEventContent> {
2827        let mut content = RoomMessageEventContent::new(msg_type);
2828        if let Some(mentions) = mentions {
2829            content = content.add_mentions(mentions);
2830        }
2831        if let Some(reply) = reply {
2832            // Since we just created the event, there is no relation attached to it. Thus,
2833            // it is safe to add the reply relation without overriding anything.
2834            content = self.make_reply_event(content.into(), reply).await?;
2835        }
2836        Ok(content)
2837    }
2838
2839    /// Creates the inner [`GalleryItemType`] for an already-uploaded media file
2840    /// provided by its source.
2841    #[cfg(feature = "unstable-msc4274")]
2842    #[allow(clippy::too_many_arguments)]
2843    pub(crate) fn make_gallery_item_type(
2844        content_type: &Mime,
2845        filename: String,
2846        source: MediaSource,
2847        caption: Option<TextMessageEventContent>,
2848        info: Option<AttachmentInfo>,
2849        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2850    ) -> GalleryItemType {
2851        make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2852    }
2853
2854    /// Update the power levels of a select set of users of this room.
2855    ///
2856    /// Issue a `power_levels` state event request to the server, changing the
2857    /// given UserId -> Int levels. May fail if the `power_levels` aren't
2858    /// locally known yet or the server rejects the state event update, e.g.
2859    /// because of insufficient permissions. Neither permissions to update
2860    /// nor whether the data might be stale is checked prior to issuing the
2861    /// request.
2862    pub async fn update_power_levels(
2863        &self,
2864        updates: Vec<(&UserId, Int)>,
2865    ) -> Result<send_state_event::v3::Response> {
2866        let mut power_levels = self.power_levels().await?;
2867
2868        for (user_id, new_level) in updates {
2869            if new_level == power_levels.users_default {
2870                power_levels.users.remove(user_id);
2871            } else {
2872                power_levels.users.insert(user_id.to_owned(), new_level);
2873            }
2874        }
2875
2876        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2877    }
2878
2879    /// Applies a set of power level changes to this room.
2880    ///
2881    /// Any values that are `None` in the given `RoomPowerLevelChanges` will
2882    /// remain unchanged.
2883    pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2884        let mut power_levels = self.power_levels().await?;
2885        power_levels.apply(changes)?;
2886        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2887        Ok(())
2888    }
2889
2890    /// Resets the room's power levels to the default values
2891    ///
2892    /// [spec]: https://spec.matrix.org/v1.9/client-server-api/#mroompower_levels
2893    pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2894        let creators = self.creators().unwrap_or_default();
2895        let rules = self.clone_info().room_version_rules_or_default();
2896
2897        let default_power_levels =
2898            RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2899        let changes = RoomPowerLevelChanges::from(default_power_levels);
2900        self.apply_power_level_changes(changes).await?;
2901        Ok(self.power_levels().await?)
2902    }
2903
2904    /// Gets the suggested role for the user with the provided `user_id`.
2905    ///
2906    /// This method checks the `RoomPowerLevels` events instead of loading the
2907    /// member list and looking for the member.
2908    pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2909        let power_level = self.get_user_power_level(user_id).await?;
2910        Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2911    }
2912
2913    /// Gets the power level the user with the provided `user_id`.
2914    ///
2915    /// This method checks the `RoomPowerLevels` events instead of loading the
2916    /// member list and looking for the member.
2917    pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2918        let event = self.power_levels().await?;
2919        Ok(event.for_user(user_id))
2920    }
2921
2922    /// Gets a map with the `UserId` of users with power levels other than `0`
2923    /// and this power level.
2924    pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2925        let power_levels = self.power_levels().await.ok();
2926        let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2927        if let Some(power_levels) = power_levels {
2928            for (id, level) in power_levels.users.into_iter() {
2929                user_power_levels.insert(id, level.into());
2930            }
2931        }
2932        user_power_levels
2933    }
2934
2935    /// Sets the name of this room.
2936    pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2937        self.send_state_event(RoomNameEventContent::new(name)).await
2938    }
2939
2940    /// Sets a new topic for this room.
2941    pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2942        self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2943    }
2944
2945    /// Sets the new avatar url for this room.
2946    ///
2947    /// # Arguments
2948    /// * `avatar_url` - The owned Matrix uri that represents the avatar
2949    /// * `info` - The optional image info that can be provided for the avatar
2950    pub async fn set_avatar_url(
2951        &self,
2952        url: &MxcUri,
2953        info: Option<avatar::ImageInfo>,
2954    ) -> Result<send_state_event::v3::Response> {
2955        self.ensure_room_joined()?;
2956
2957        let mut room_avatar_event = RoomAvatarEventContent::new();
2958        room_avatar_event.url = Some(url.to_owned());
2959        room_avatar_event.info = info.map(Box::new);
2960
2961        self.send_state_event(room_avatar_event).await
2962    }
2963
2964    /// Removes the avatar from the room
2965    pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2966        self.send_state_event(RoomAvatarEventContent::new()).await
2967    }
2968
2969    /// Uploads a new avatar for this room.
2970    ///
2971    /// # Arguments
2972    /// * `mime` - The mime type describing the data
2973    /// * `data` - The data representation of the avatar
2974    /// * `info` - The optional image info provided for the avatar, the blurhash
2975    ///   and the mimetype will always be updated
2976    pub async fn upload_avatar(
2977        &self,
2978        mime: &Mime,
2979        data: Vec<u8>,
2980        info: Option<avatar::ImageInfo>,
2981    ) -> Result<send_state_event::v3::Response> {
2982        self.ensure_room_joined()?;
2983
2984        let upload_response = self.client.media().upload(mime, data, None).await?;
2985        let mut info = info.unwrap_or_default();
2986        info.blurhash = upload_response.blurhash;
2987        info.mimetype = Some(mime.to_string());
2988
2989        self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2990    }
2991
2992    /// Send a state event with an empty state key to the homeserver.
2993    ///
2994    /// For state events with a non-empty state key, see
2995    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
2996    ///
2997    /// Returns the parsed response from the server.
2998    ///
2999    /// # Arguments
3000    ///
3001    /// * `content` - The content of the state event.
3002    ///
3003    /// # Examples
3004    ///
3005    /// ```no_run
3006    /// # use serde::{Deserialize, Serialize};
3007    /// # async {
3008    /// # let joined_room: matrix_sdk::Room = todo!();
3009    /// use matrix_sdk::ruma::{
3010    ///     EventEncryptionAlgorithm,
3011    ///     events::{
3012    ///         EmptyStateKey, macros::EventContent,
3013    ///         room::encryption::RoomEncryptionEventContent,
3014    ///     },
3015    /// };
3016    ///
3017    /// let encryption_event_content = RoomEncryptionEventContent::new(
3018    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
3019    /// );
3020    /// joined_room.send_state_event(encryption_event_content).await?;
3021    ///
3022    /// // Custom event:
3023    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3024    /// #[ruma_event(
3025    ///     type = "org.matrix.msc_9000.xxx",
3026    ///     kind = State,
3027    ///     state_key_type = EmptyStateKey,
3028    /// )]
3029    /// struct XxxStateEventContent {/* fields... */}
3030    ///
3031    /// let content: XxxStateEventContent = todo!();
3032    /// joined_room.send_state_event(content).await?;
3033    /// # anyhow::Ok(()) };
3034    /// ```
3035    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3036    #[instrument(skip_all)]
3037    pub async fn send_state_event(
3038        &self,
3039        content: impl StateEventContent<StateKey = EmptyStateKey>,
3040    ) -> Result<send_state_event::v3::Response> {
3041        self.send_state_event_for_key(&EmptyStateKey, content).await
3042    }
3043
3044    /// Send a state event with an empty state key to the homeserver.
3045    ///
3046    /// For state events with a non-empty state key, see
3047    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3048    ///
3049    /// If the experimental state event encryption feature is enabled, this
3050    /// method will transparently encrypt the event if this room is
3051    /// encrypted (except if the event type is considered critical for the room
3052    /// to function, as outlined in [MSC4362][msc4362]).
3053    ///
3054    /// Returns the parsed response from the server.
3055    ///
3056    /// # Arguments
3057    ///
3058    /// * `content` - The content of the state event.
3059    ///
3060    /// # Examples
3061    ///
3062    /// ```no_run
3063    /// # use serde::{Deserialize, Serialize};
3064    /// # async {
3065    /// # let joined_room: matrix_sdk::Room = todo!();
3066    /// use matrix_sdk::ruma::{
3067    ///     EventEncryptionAlgorithm,
3068    ///     events::{
3069    ///         EmptyStateKey, macros::EventContent,
3070    ///         room::encryption::RoomEncryptionEventContent,
3071    ///     },
3072    /// };
3073    ///
3074    /// let encryption_event_content = RoomEncryptionEventContent::new(
3075    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
3076    /// );
3077    /// joined_room.send_state_event(encryption_event_content).await?;
3078    ///
3079    /// // Custom event:
3080    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3081    /// #[ruma_event(
3082    ///     type = "org.matrix.msc_9000.xxx",
3083    ///     kind = State,
3084    ///     state_key_type = EmptyStateKey,
3085    /// )]
3086    /// struct XxxStateEventContent {/* fields... */}
3087    ///
3088    /// let content: XxxStateEventContent = todo!();
3089    /// joined_room.send_state_event(content).await?;
3090    /// # anyhow::Ok(()) };
3091    /// ```
3092    ///
3093    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/4362-encrypted-state.md
3094    #[cfg(feature = "experimental-encrypted-state-events")]
3095    #[instrument(skip_all)]
3096    pub fn send_state_event<'a>(
3097        &'a self,
3098        content: impl StateEventContent<StateKey = EmptyStateKey>,
3099    ) -> SendStateEvent<'a> {
3100        self.send_state_event_for_key(&EmptyStateKey, content)
3101    }
3102
3103    /// Send a state event to the homeserver.
3104    ///
3105    /// Returns the parsed response from the server.
3106    ///
3107    /// # Arguments
3108    ///
3109    /// * `content` - The content of the state event.
3110    ///
3111    /// * `state_key` - A unique key which defines the overwriting semantics for
3112    ///   this piece of room state.
3113    ///
3114    /// # Examples
3115    ///
3116    /// ```no_run
3117    /// # use serde::{Deserialize, Serialize};
3118    /// # async {
3119    /// # let joined_room: matrix_sdk::Room = todo!();
3120    /// use matrix_sdk::ruma::{
3121    ///     events::{
3122    ///         macros::EventContent,
3123    ///         room::member::{RoomMemberEventContent, MembershipState},
3124    ///     },
3125    ///     mxc_uri,
3126    /// };
3127    ///
3128    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3129    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3130    /// content.avatar_url = Some(avatar_url);
3131    ///
3132    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3133    ///
3134    /// // Custom event:
3135    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3136    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3137    /// struct XxxStateEventContent { /* fields... */ }
3138    ///
3139    /// let content: XxxStateEventContent = todo!();
3140    /// joined_room.send_state_event_for_key("foo", content).await?;
3141    /// # anyhow::Ok(()) };
3142    /// ```
3143    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3144    pub async fn send_state_event_for_key<C, K>(
3145        &self,
3146        state_key: &K,
3147        content: C,
3148    ) -> Result<send_state_event::v3::Response>
3149    where
3150        C: StateEventContent,
3151        C::StateKey: Borrow<K>,
3152        K: AsRef<str> + ?Sized,
3153    {
3154        self.ensure_room_joined()?;
3155        let request =
3156            send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3157        let response = self.client.send(request).await?;
3158        Ok(response)
3159    }
3160
3161    /// Send a state event to the homeserver. If state encryption is enabled in
3162    /// this room, the event will be encrypted.
3163    ///
3164    /// If the experimental state event encryption feature is enabled, this
3165    /// method will transparently encrypt the event if this room is
3166    /// encrypted (except if the event type is considered critical for the room
3167    /// to function, as outlined in [MSC4362][msc4362]).
3168    ///
3169    /// Returns the parsed response from the server.
3170    ///
3171    /// # Arguments
3172    ///
3173    /// * `content` - The content of the state event.
3174    ///
3175    /// * `state_key` - A unique key which defines the overwriting semantics for
3176    ///   this piece of room state.
3177    ///
3178    /// # Examples
3179    ///
3180    /// ```no_run
3181    /// # use serde::{Deserialize, Serialize};
3182    /// # async {
3183    /// # let joined_room: matrix_sdk::Room = todo!();
3184    /// use matrix_sdk::ruma::{
3185    ///     events::{
3186    ///         macros::EventContent,
3187    ///         room::member::{RoomMemberEventContent, MembershipState},
3188    ///     },
3189    ///     mxc_uri,
3190    /// };
3191    ///
3192    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3193    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3194    /// content.avatar_url = Some(avatar_url);
3195    ///
3196    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3197    ///
3198    /// // Custom event:
3199    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3200    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3201    /// struct XxxStateEventContent { /* fields... */ }
3202    ///
3203    /// let content: XxxStateEventContent = todo!();
3204    /// joined_room.send_state_event_for_key("foo", content).await?;
3205    /// # anyhow::Ok(()) };
3206    /// ```
3207    ///
3208    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3209    #[cfg(feature = "experimental-encrypted-state-events")]
3210    pub fn send_state_event_for_key<'a, C, K>(
3211        &'a self,
3212        state_key: &K,
3213        content: C,
3214    ) -> SendStateEvent<'a>
3215    where
3216        C: StateEventContent,
3217        C::StateKey: Borrow<K>,
3218        K: AsRef<str> + ?Sized,
3219    {
3220        SendStateEvent::new(self, state_key, content)
3221    }
3222
3223    /// Send a raw room state event to the homeserver.
3224    ///
3225    /// Returns the parsed response from the server.
3226    ///
3227    /// # Arguments
3228    ///
3229    /// * `event_type` - The type of the event that we're sending out.
3230    ///
3231    /// * `state_key` - A unique key which defines the overwriting semantics for
3232    /// this piece of room state. This value is often a zero-length string.
3233    ///
3234    /// * `content` - The content of the event as a raw JSON value. The argument
3235    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3236    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3237    ///
3238    /// # Examples
3239    ///
3240    /// ```no_run
3241    /// use serde_json::json;
3242    ///
3243    /// # async {
3244    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3245    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3246    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3247    ///
3248    /// if let Some(room) = client.get_room(&room_id) {
3249    ///     room.send_state_event_raw("m.room.member", "", json!({
3250    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3251    ///         "displayname": "Alice Margatroid",
3252    ///         "membership": "join",
3253    ///     })).await?;
3254    /// }
3255    /// # anyhow::Ok(()) };
3256    /// ```
3257    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3258    #[instrument(skip_all)]
3259    pub async fn send_state_event_raw(
3260        &self,
3261        event_type: &str,
3262        state_key: &str,
3263        content: impl IntoRawStateEventContent,
3264    ) -> Result<send_state_event::v3::Response> {
3265        self.ensure_room_joined()?;
3266
3267        let request = send_state_event::v3::Request::new_raw(
3268            self.room_id().to_owned(),
3269            event_type.into(),
3270            state_key.to_owned(),
3271            content.into_raw_state_event_content(),
3272        );
3273
3274        Ok(self.client.send(request).await?)
3275    }
3276
3277    /// Send a raw room state event to the homeserver.
3278    ///
3279    /// If the experimental state event encryption feature is enabled, this
3280    /// method will transparently encrypt the event if this room is
3281    /// encrypted (except if the event type is considered critical for the room
3282    /// to function, as outlined in [MSC4362][msc4362]).
3283    ///
3284    /// Returns the parsed response from the server.
3285    ///
3286    /// # Arguments
3287    ///
3288    /// * `event_type` - The type of the event that we're sending out.
3289    ///
3290    /// * `state_key` - A unique key which defines the overwriting semantics for
3291    /// this piece of room state. This value is often a zero-length string.
3292    ///
3293    /// * `content` - The content of the event as a raw JSON value. The argument
3294    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3295    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3296    ///
3297    /// # Examples
3298    ///
3299    /// ```no_run
3300    /// use serde_json::json;
3301    ///
3302    /// # async {
3303    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3304    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3305    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3306    ///
3307    /// if let Some(room) = client.get_room(&room_id) {
3308    ///     room.send_state_event_raw("m.room.member", "", json!({
3309    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3310    ///         "displayname": "Alice Margatroid",
3311    ///         "membership": "join",
3312    ///     })).await?;
3313    /// }
3314    /// # anyhow::Ok(()) };
3315    /// ```
3316    ///
3317    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3318    #[cfg(feature = "experimental-encrypted-state-events")]
3319    #[instrument(skip_all)]
3320    pub fn send_state_event_raw<'a>(
3321        &'a self,
3322        event_type: &'a str,
3323        state_key: &'a str,
3324        content: impl IntoRawStateEventContent,
3325    ) -> SendRawStateEvent<'a> {
3326        SendRawStateEvent::new(self, event_type, state_key, content)
3327    }
3328
3329    /// Strips all information out of an event of the room.
3330    ///
3331    /// Returns the [`redact_event::v3::Response`] from the server.
3332    ///
3333    /// This cannot be undone. Users may redact their own events, and any user
3334    /// with a power level greater than or equal to the redact power level of
3335    /// the room may redact events there.
3336    ///
3337    /// # Arguments
3338    ///
3339    /// * `event_id` - The ID of the event to redact
3340    ///
3341    /// * `reason` - The reason for the event being redacted.
3342    ///
3343    /// * `txn_id` - A unique ID that can be attached to this event as
3344    /// its transaction ID. If not given one is created for the message.
3345    ///
3346    /// # Examples
3347    ///
3348    /// ```no_run
3349    /// use matrix_sdk::ruma::event_id;
3350    ///
3351    /// # async {
3352    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3353    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3354    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3355    /// #
3356    /// if let Some(room) = client.get_room(&room_id) {
3357    ///     let event_id = event_id!("$xxxxxx:example.org");
3358    ///     let reason = Some("Indecent material");
3359    ///     room.redact(&event_id, reason, None).await?;
3360    /// }
3361    /// # anyhow::Ok(()) };
3362    /// ```
3363    #[instrument(skip_all)]
3364    pub async fn redact(
3365        &self,
3366        event_id: &EventId,
3367        reason: Option<&str>,
3368        txn_id: Option<OwnedTransactionId>,
3369    ) -> HttpResult<redact_event::v3::Response> {
3370        let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3371        let request = assign!(
3372            redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3373            { reason: reason.map(ToOwned::to_owned) }
3374        );
3375
3376        self.client.send(request).await
3377    }
3378
3379    /// Get a list of servers that should know this room.
3380    ///
3381    /// Uses the synced members of the room and the suggested [routing
3382    /// algorithm] from the Matrix spec.
3383    ///
3384    /// Returns at most three servers.
3385    ///
3386    /// [routing algorithm]: https://spec.matrix.org/v1.3/appendices/#routing
3387    pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3388        let acl_ev = self
3389            .get_state_event_static::<RoomServerAclEventContent>()
3390            .await?
3391            .and_then(|ev| ev.deserialize().ok());
3392        let acl = acl_ev.as_ref().and_then(|ev| match ev {
3393            SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3394            SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3395        });
3396
3397        // Filter out server names that:
3398        // - Are blocked due to server ACLs
3399        // - Are IP addresses
3400        let members: Vec<_> = self
3401            .members_no_sync(RoomMemberships::JOIN)
3402            .await?
3403            .into_iter()
3404            .filter(|member| {
3405                let server = member.user_id().server_name();
3406                acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3407            })
3408            .collect();
3409
3410        // Get the server of the highest power level user in the room, provided
3411        // they are at least power level 50.
3412        let max = members
3413            .iter()
3414            .max_by_key(|member| member.power_level())
3415            .filter(|max| max.power_level() >= int!(50))
3416            .map(|member| member.user_id().server_name());
3417
3418        // Sort the servers by population.
3419        let servers = members
3420            .iter()
3421            .map(|member| member.user_id().server_name())
3422            .filter(|server| max.filter(|max| max == server).is_none())
3423            .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3424                *servers.entry(server).or_default() += 1;
3425                servers
3426            });
3427        let mut servers: Vec<_> = servers.into_iter().collect();
3428        servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3429
3430        Ok(max
3431            .into_iter()
3432            .chain(servers.into_iter().map(|(name, _)| name))
3433            .take(3)
3434            .map(ToOwned::to_owned)
3435            .collect())
3436    }
3437
3438    /// Get a `matrix.to` permalink to this room.
3439    ///
3440    /// If this room has an alias, we use it. Otherwise, we try to use the
3441    /// synced members in the room for [routing] the room ID.
3442    ///
3443    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3444    pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3445        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3446            return Ok(alias.matrix_to_uri());
3447        }
3448
3449        let via = self.route().await?;
3450        Ok(self.room_id().matrix_to_uri_via(via))
3451    }
3452
3453    /// Get a `matrix:` permalink to this room.
3454    ///
3455    /// If this room has an alias, we use it. Otherwise, we try to use the
3456    /// synced members in the room for [routing] the room ID.
3457    ///
3458    /// # Arguments
3459    ///
3460    /// * `join` - Whether the user should join the room.
3461    ///
3462    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3463    pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3464        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3465            return Ok(alias.matrix_uri(join));
3466        }
3467
3468        let via = self.route().await?;
3469        Ok(self.room_id().matrix_uri_via(via, join))
3470    }
3471
3472    /// Get a `matrix.to` permalink to an event in this room.
3473    ///
3474    /// We try to use the synced members in the room for [routing] the room ID.
3475    ///
3476    /// *Note*: This method does not check if the given event ID is actually
3477    /// part of this room. It needs to be checked before calling this method
3478    /// otherwise the permalink won't work.
3479    ///
3480    /// # Arguments
3481    ///
3482    /// * `event_id` - The ID of the event.
3483    ///
3484    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3485    pub async fn matrix_to_event_permalink(
3486        &self,
3487        event_id: impl Into<OwnedEventId>,
3488    ) -> Result<MatrixToUri> {
3489        // Don't use the alias because an event is tied to a room ID, but an
3490        // alias might point to another room, e.g. after a room upgrade.
3491        let via = self.route().await?;
3492        Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3493    }
3494
3495    /// Get a `matrix:` permalink to an event in this room.
3496    ///
3497    /// We try to use the synced members in the room for [routing] the room ID.
3498    ///
3499    /// *Note*: This method does not check if the given event ID is actually
3500    /// part of this room. It needs to be checked before calling this method
3501    /// otherwise the permalink won't work.
3502    ///
3503    /// # Arguments
3504    ///
3505    /// * `event_id` - The ID of the event.
3506    ///
3507    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3508    pub async fn matrix_event_permalink(
3509        &self,
3510        event_id: impl Into<OwnedEventId>,
3511    ) -> Result<MatrixUri> {
3512        // Don't use the alias because an event is tied to a room ID, but an
3513        // alias might point to another room, e.g. after a room upgrade.
3514        let via = self.route().await?;
3515        Ok(self.room_id().matrix_event_uri_via(event_id, via))
3516    }
3517
3518    /// Get the latest receipt of a user in this room.
3519    ///
3520    /// # Arguments
3521    ///
3522    /// * `receipt_type` - The type of receipt to get.
3523    ///
3524    /// * `thread` - The thread containing the event of the receipt, if any.
3525    ///
3526    /// * `user_id` - The ID of the user.
3527    ///
3528    /// Returns the ID of the event on which the receipt applies and the
3529    /// receipt.
3530    pub async fn load_user_receipt(
3531        &self,
3532        receipt_type: ReceiptType,
3533        thread: ReceiptThread,
3534        user_id: &UserId,
3535    ) -> Result<Option<(OwnedEventId, Receipt)>> {
3536        self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3537    }
3538
3539    /// Load the receipts for an event in this room from storage.
3540    ///
3541    /// # Arguments
3542    ///
3543    /// * `receipt_type` - The type of receipt to get.
3544    ///
3545    /// * `thread` - The thread containing the event of the receipt, if any.
3546    ///
3547    /// * `event_id` - The ID of the event.
3548    ///
3549    /// Returns a list of IDs of users who have sent a receipt for the event and
3550    /// the corresponding receipts.
3551    pub async fn load_event_receipts(
3552        &self,
3553        receipt_type: ReceiptType,
3554        thread: ReceiptThread,
3555        event_id: &EventId,
3556    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3557        self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3558    }
3559
3560    /// Get the push-condition context for this room.
3561    ///
3562    /// Returns `None` if some data couldn't be found. This should only happen
3563    /// in brand new rooms, while we process its state.
3564    pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3565        self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3566    }
3567
3568    /// Get the push-condition context for this room, with a choice to include
3569    /// thread subscriptions or not, based on the extra
3570    /// `with_threads_subscriptions` parameter.
3571    ///
3572    /// Returns `None` if some data couldn't be found. This should only happen
3573    /// in brand new rooms, while we process its state.
3574    pub(crate) async fn push_condition_room_ctx_internal(
3575        &self,
3576        with_threads_subscriptions: bool,
3577    ) -> Result<Option<PushConditionRoomCtx>> {
3578        let room_id = self.room_id();
3579        let user_id = self.own_user_id();
3580        let room_info = self.clone_info();
3581        let member_count = room_info.active_members_count();
3582
3583        let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3584            member.name().to_owned()
3585        } else {
3586            return Ok(None);
3587        };
3588
3589        let power_levels = match self.power_levels().await {
3590            Ok(power_levels) => Some(power_levels.into()),
3591            Err(error) => {
3592                if matches!(room_info.state(), RoomState::Joined) {
3593                    // It's normal to not have the power levels in a non-joined room, so don't log
3594                    // the error if the room is not joined
3595                    error!("Could not compute power levels for push conditions: {error}");
3596                }
3597                None
3598            }
3599        };
3600
3601        let mut ctx = assign!(PushConditionRoomCtx::new(
3602            room_id.to_owned(),
3603            UInt::new(member_count).unwrap_or(UInt::MAX),
3604            user_id.to_owned(),
3605            user_display_name,
3606        ),
3607        {
3608            power_levels,
3609        });
3610
3611        if with_threads_subscriptions {
3612            let this = self.clone();
3613            ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3614                let room = this.clone();
3615                Box::pin(async move {
3616                    if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3617                        maybe_sub.is_some()
3618                    } else {
3619                        false
3620                    }
3621                })
3622            });
3623        }
3624
3625        Ok(Some(ctx))
3626    }
3627
3628    /// Retrieves a [`PushContext`] that can be used to compute the push
3629    /// actions for events.
3630    pub async fn push_context(&self) -> Result<Option<PushContext>> {
3631        self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3632    }
3633
3634    /// Retrieves a [`PushContext`] that can be used to compute the push actions
3635    /// for events, with a choice to include thread subscriptions or not,
3636    /// based on the extra `with_threads_subscriptions` parameter.
3637    #[instrument(skip(self))]
3638    pub(crate) async fn push_context_internal(
3639        &self,
3640        with_threads_subscriptions: bool,
3641    ) -> Result<Option<PushContext>> {
3642        let Some(push_condition_room_ctx) =
3643            self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3644        else {
3645            debug!("Could not aggregate push context");
3646            return Ok(None);
3647        };
3648        let push_rules = self.client().account().push_rules().await?;
3649        Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3650    }
3651
3652    /// Get the push actions for the given event with the current room state.
3653    ///
3654    /// Note that it is possible that no push action is returned because the
3655    /// current room state does not have all the required state events.
3656    pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3657        if let Some(ctx) = self.push_context().await? {
3658            Ok(Some(ctx.for_event(event).await))
3659        } else {
3660            Ok(None)
3661        }
3662    }
3663
3664    /// The membership details of the (latest) invite for the logged-in user in
3665    /// this room.
3666    pub async fn invite_details(&self) -> Result<Invite> {
3667        let state = self.state();
3668
3669        if state != RoomState::Invited {
3670            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3671        }
3672
3673        let invitee = self
3674            .get_member_no_sync(self.own_user_id())
3675            .await?
3676            .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3677        let event = invitee.event();
3678
3679        let inviter_id = event.sender().to_owned();
3680        let inviter = self.get_member_no_sync(&inviter_id).await?;
3681
3682        Ok(Invite { invitee, inviter_id, inviter })
3683    }
3684
3685    /// Get the membership details for the current user.
3686    ///
3687    /// Returns:
3688    ///     - If the user was present in the room, a
3689    ///       [`RoomMemberWithSenderInfo`] containing both the user info and the
3690    ///       member info of the sender of the `m.room.member` event.
3691    ///     - If the current user is not present, an error.
3692    pub async fn member_with_sender_info(
3693        &self,
3694        user_id: &UserId,
3695    ) -> Result<RoomMemberWithSenderInfo> {
3696        let Some(member) = self.get_member_no_sync(user_id).await? else {
3697            return Err(Error::InsufficientData);
3698        };
3699
3700        let sender_member =
3701            if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3702                // If the sender room member info is already available, return it
3703                Some(member)
3704            } else if self.are_members_synced() {
3705                // The room members are synced and we couldn't find the sender info
3706                None
3707            } else if self.sync_members().await.is_ok() {
3708                // Try getting the sender room member info again after syncing
3709                self.get_member_no_sync(member.event().sender()).await?
3710            } else {
3711                None
3712            };
3713
3714        Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3715    }
3716
3717    /// Forget this room.
3718    ///
3719    /// This communicates to the homeserver that it should forget the room.
3720    ///
3721    /// Only left or banned-from rooms can be forgotten.
3722    pub async fn forget(&self) -> Result<()> {
3723        let state = self.state();
3724        match state {
3725            RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3726                return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3727                    "Left / Banned",
3728                    state,
3729                ))));
3730            }
3731            RoomState::Left | RoomState::Banned => {}
3732        }
3733
3734        let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3735        let _response = self.client.send(request).await?;
3736
3737        // If it was a DM, remove the room from the `m.direct` global account data.
3738        if self.inner.direct_targets_length() != 0
3739            && let Err(e) = self.set_is_direct(false).await
3740        {
3741            // It is not important whether we managed to remove the room, it will not have
3742            // any consequences, so just log the error.
3743            warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3744        }
3745
3746        self.client.base_client().forget_room(self.inner.room_id()).await?;
3747
3748        Ok(())
3749    }
3750
3751    fn ensure_room_joined(&self) -> Result<()> {
3752        let state = self.state();
3753        if state == RoomState::Joined {
3754            Ok(())
3755        } else {
3756            Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3757        }
3758    }
3759
3760    /// Get the notification mode.
3761    pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3762        if !matches!(self.state(), RoomState::Joined) {
3763            return None;
3764        }
3765
3766        let notification_settings = self.client().notification_settings().await;
3767
3768        // Get the user-defined mode if available
3769        let notification_mode =
3770            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3771
3772        if notification_mode.is_some() {
3773            notification_mode
3774        } else if let Ok(is_encrypted) =
3775            self.latest_encryption_state().await.map(|state| state.is_encrypted())
3776        {
3777            // Otherwise, if encrypted status is available, get the default mode for this
3778            // type of room.
3779            // From the point of view of notification settings, a `one-to-one` room is one
3780            // that involves exactly two people.
3781            let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3782            let default_mode = notification_settings
3783                .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3784                .await;
3785            Some(default_mode)
3786        } else {
3787            None
3788        }
3789    }
3790
3791    /// Get the user-defined notification mode.
3792    ///
3793    /// The result is cached for fast and non-async call. To read the cached
3794    /// result, use
3795    /// [`matrix_sdk_base::Room::cached_user_defined_notification_mode`].
3796    //
3797    // Note for maintainers:
3798    //
3799    // The fact the result is cached is an important property. If you change that in
3800    // the future, please review all calls to this method.
3801    pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3802        if !matches!(self.state(), RoomState::Joined) {
3803            return None;
3804        }
3805
3806        let notification_settings = self.client().notification_settings().await;
3807
3808        // Get the user-defined mode if available.
3809        let mode =
3810            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3811
3812        if let Some(mode) = mode {
3813            self.update_cached_user_defined_notification_mode(mode);
3814        }
3815
3816        mode
3817    }
3818
3819    /// Report an event as inappropriate to the homeserver's administrator.
3820    ///
3821    /// # Arguments
3822    ///
3823    /// * `event_id` - The ID of the event to report.
3824    /// * `score` - The score to rate this content.
3825    /// * `reason` - The reason the content is being reported.
3826    ///
3827    /// # Errors
3828    ///
3829    /// Returns an error if the room is not joined or if an error occurs with
3830    /// the request.
3831    pub async fn report_content(
3832        &self,
3833        event_id: OwnedEventId,
3834        score: Option<ReportedContentScore>,
3835        reason: Option<String>,
3836    ) -> Result<report_content::v3::Response> {
3837        let state = self.state();
3838        if state != RoomState::Joined {
3839            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3840        }
3841
3842        let request = report_content::v3::Request::new(
3843            self.inner.room_id().to_owned(),
3844            event_id,
3845            score.map(Into::into),
3846            reason,
3847        );
3848        Ok(self.client.send(request).await?)
3849    }
3850
3851    /// Reports a room as inappropriate to the server.
3852    /// The caller is not required to be joined to the room to report it.
3853    ///
3854    /// # Arguments
3855    ///
3856    /// * `reason` - The reason the room is being reported.
3857    ///
3858    /// # Errors
3859    ///
3860    /// Returns an error if the room is not found or on rate limit
3861    pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3862        let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3863
3864        Ok(self.client.send(request).await?)
3865    }
3866
3867    /// Set a flag on the room to indicate that the user has explicitly marked
3868    /// it as (un)read.
3869    ///
3870    /// This is a no-op if [`BaseRoom::is_marked_unread()`] returns the same
3871    /// value as `unread`.
3872    pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3873        if self.is_marked_unread() == unread {
3874            // The request is not necessary.
3875            return Ok(());
3876        }
3877
3878        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3879
3880        let content = MarkedUnreadEventContent::new(unread);
3881
3882        let request = set_room_account_data::v3::Request::new(
3883            user_id.to_owned(),
3884            self.inner.room_id().to_owned(),
3885            &content,
3886        )?;
3887
3888        self.client.send(request).await?;
3889        Ok(())
3890    }
3891
3892    /// Returns the [`RoomEventCache`] associated to this room, assuming the
3893    /// global [`EventCache`] has been enabled for subscription.
3894    pub async fn event_cache(
3895        &self,
3896    ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3897        self.client.event_cache().for_room(self.room_id()).await
3898    }
3899
3900    /// Get the beacon information event in the room for the `user_id`.
3901    ///
3902    /// # Errors
3903    ///
3904    /// Returns an error if the event is redacted, stripped, not found or could
3905    /// not be deserialized.
3906    pub(crate) async fn get_user_beacon_info(
3907        &self,
3908        user_id: &UserId,
3909    ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3910        let raw_event = self
3911            .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3912            .await?
3913            .ok_or(BeaconError::NotFound)?;
3914
3915        match raw_event.deserialize()? {
3916            SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3917            SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3918            SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3919        }
3920    }
3921
3922    /// Start sharing live location in the room.
3923    ///
3924    /// # Arguments
3925    ///
3926    /// * `duration_millis` - The duration for which the live location is
3927    ///   shared, in milliseconds.
3928    /// * `description` - An optional description for the live location share.
3929    ///
3930    /// # Errors
3931    ///
3932    /// Returns an error if the room is not joined or if the state event could
3933    /// not be sent.
3934    pub async fn start_live_location_share(
3935        &self,
3936        duration_millis: u64,
3937        description: Option<String>,
3938    ) -> Result<send_state_event::v3::Response> {
3939        self.ensure_room_joined()?;
3940
3941        self.send_state_event_for_key(
3942            self.own_user_id(),
3943            BeaconInfoEventContent::new(
3944                description,
3945                Duration::from_millis(duration_millis),
3946                true,
3947                None,
3948            ),
3949        )
3950        .await
3951    }
3952
3953    /// Stop sharing live location in the room.
3954    ///
3955    /// # Errors
3956    ///
3957    /// Returns an error if the room is not joined, if the beacon information
3958    /// is redacted or stripped, or if the state event is not found.
3959    pub async fn stop_live_location_share(
3960        &self,
3961    ) -> Result<send_state_event::v3::Response, BeaconError> {
3962        self.ensure_room_joined()?;
3963
3964        let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3965        beacon_info_event.content.stop();
3966        Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3967    }
3968
3969    /// Send a location beacon event in the current room.
3970    ///
3971    /// # Arguments
3972    ///
3973    /// * `geo_uri` - The geo URI of the location beacon.
3974    ///
3975    /// # Errors
3976    ///
3977    /// Returns an error if the room is not joined, if the beacon information
3978    /// is redacted or stripped, if the location share is no longer live,
3979    /// or if the state event is not found.
3980    pub async fn send_location_beacon(
3981        &self,
3982        geo_uri: String,
3983    ) -> Result<send_message_event::v3::Response, BeaconError> {
3984        self.ensure_room_joined()?;
3985
3986        let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3987
3988        if beacon_info_event.content.is_live() {
3989            let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3990            Ok(self.send(content).await?.response)
3991        } else {
3992            Err(BeaconError::NotLive)
3993        }
3994    }
3995
3996    /// Store the given `ComposerDraft` in the state store using the current
3997    /// room id and optional thread root id as identifier.
3998    pub async fn save_composer_draft(
3999        &self,
4000        draft: ComposerDraft,
4001        thread_root: Option<&EventId>,
4002    ) -> Result<()> {
4003        self.client
4004            .state_store()
4005            .set_kv_data(
4006                StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
4007                StateStoreDataValue::ComposerDraft(draft),
4008            )
4009            .await?;
4010        Ok(())
4011    }
4012
4013    /// Retrieve the `ComposerDraft` stored in the state store for this room
4014    /// and given thread, if any.
4015    pub async fn load_composer_draft(
4016        &self,
4017        thread_root: Option<&EventId>,
4018    ) -> Result<Option<ComposerDraft>> {
4019        let data = self
4020            .client
4021            .state_store()
4022            .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4023            .await?;
4024        Ok(data.and_then(|d| d.into_composer_draft()))
4025    }
4026
4027    /// Remove the `ComposerDraft` stored in the state store for this room
4028    /// and given thread, if any.
4029    pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
4030        self.client
4031            .state_store()
4032            .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4033            .await?;
4034        Ok(())
4035    }
4036
4037    /// Load pinned state events for a room from the `/state` endpoint in the
4038    /// home server.
4039    pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
4040        let response = self
4041            .client
4042            .send(get_state_event_for_key::v3::Request::new(
4043                self.room_id().to_owned(),
4044                StateEventType::RoomPinnedEvents,
4045                "".to_owned(),
4046            ))
4047            .await;
4048
4049        match response {
4050            Ok(response) => Ok(Some(
4051                response
4052                    .into_content()
4053                    .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
4054                    .pinned,
4055            )),
4056            Err(http_error) => match http_error.as_client_api_error() {
4057                Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
4058                _ => Err(http_error.into()),
4059            },
4060        }
4061    }
4062
4063    /// Observe live location sharing events for this room.
4064    ///
4065    /// The returned observable will receive the newest event for each sync
4066    /// response that contains an `m.beacon` event.
4067    ///
4068    /// Returns a stream of [`ObservableLiveLocation`] events from other users
4069    /// in the room, excluding the live location events of the room's own user.
4070    pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
4071        ObservableLiveLocation::new(&self.client, self.room_id())
4072    }
4073
4074    /// Subscribe to knock requests in this `Room`.
4075    ///
4076    /// The current requests to join the room will be emitted immediately
4077    /// when subscribing.
4078    ///
4079    /// A new set of knock requests will be emitted whenever:
4080    /// - A new member event is received.
4081    /// - A knock request is marked as seen.
4082    /// - A sync is gappy (limited), so room membership information may be
4083    ///   outdated.
4084    ///
4085    /// Returns both a stream of knock requests and a handle for a task that
4086    /// will clean up the seen knock request ids when possible.
4087    pub async fn subscribe_to_knock_requests(
4088        &self,
4089    ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
4090        let this = Arc::new(self.clone());
4091
4092        let room_member_events_observer =
4093            self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
4094
4095        let current_seen_ids = self.get_seen_knock_request_ids().await?;
4096        let mut seen_request_ids_stream = self
4097            .seen_knock_request_ids_map
4098            .subscribe()
4099            .await
4100            .map(|values| values.unwrap_or_default());
4101
4102        let mut room_info_stream = self.subscribe_info();
4103
4104        // Spawn a task that will clean up the seen knock request ids when updated room
4105        // members are received
4106        let clear_seen_ids_handle = spawn({
4107            let this = self.clone();
4108            async move {
4109                let mut member_updates_stream = this.room_member_updates_sender.subscribe();
4110                while member_updates_stream.recv().await.is_ok() {
4111                    // If room members were updated, try to remove outdated seen knock request ids
4112                    if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
4113                        warn!("Failed to remove seen knock requests: {err}")
4114                    }
4115                }
4116            }
4117        });
4118
4119        let combined_stream = stream! {
4120            // Emit current requests to join
4121            match this.get_current_join_requests(&current_seen_ids).await {
4122                Ok(initial_requests) => yield initial_requests,
4123                Err(err) => warn!("Failed to get initial requests to join: {err}")
4124            }
4125
4126            let mut requests_stream = room_member_events_observer.subscribe();
4127            let mut seen_ids = current_seen_ids.clone();
4128
4129            loop {
4130                // This is equivalent to a combine stream operation, triggering a new emission
4131                // when any of the branches changes
4132                tokio::select! {
4133                    Some((event, _)) = requests_stream.next() => {
4134                        if let Some(event) = event.as_original() {
4135                            // If we can calculate the membership change, try to emit only when needed
4136                            let emit = if event.prev_content().is_some() {
4137                                matches!(event.membership_change(),
4138                                    MembershipChange::Banned |
4139                                    MembershipChange::Knocked |
4140                                    MembershipChange::KnockAccepted |
4141                                    MembershipChange::KnockDenied |
4142                                    MembershipChange::KnockRetracted
4143                                )
4144                            } else {
4145                                // If we can't calculate the membership change, assume we need to
4146                                // emit updated values
4147                                true
4148                            };
4149
4150                            if emit {
4151                                match this.get_current_join_requests(&seen_ids).await {
4152                                    Ok(requests) => yield requests,
4153                                    Err(err) => {
4154                                        warn!("Failed to get updated knock requests on new member event: {err}")
4155                                    }
4156                                }
4157                            }
4158                        }
4159                    }
4160
4161                    Some(new_seen_ids) = seen_request_ids_stream.next() => {
4162                        // Update the current seen ids
4163                        seen_ids = new_seen_ids;
4164
4165                        // If seen requests have changed we need to recalculate
4166                        // all the knock requests
4167                        match this.get_current_join_requests(&seen_ids).await {
4168                            Ok(requests) => yield requests,
4169                            Err(err) => {
4170                                warn!("Failed to get updated knock requests on seen ids changed: {err}")
4171                            }
4172                        }
4173                    }
4174
4175                    Some(room_info) = room_info_stream.next() => {
4176                        // We need to emit new items when we may have missing room members:
4177                        // this usually happens after a gappy (limited) sync
4178                        if !room_info.are_members_synced() {
4179                            match this.get_current_join_requests(&seen_ids).await {
4180                                Ok(requests) => yield requests,
4181                                Err(err) => {
4182                                    warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4183                                }
4184                            }
4185                        }
4186                    }
4187                    // If the streams in all branches are closed, stop the loop
4188                    else => break,
4189                }
4190            }
4191        };
4192
4193        Ok((combined_stream, clear_seen_ids_handle))
4194    }
4195
4196    async fn get_current_join_requests(
4197        &self,
4198        seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4199    ) -> Result<Vec<KnockRequest>> {
4200        Ok(self
4201            .members(RoomMemberships::KNOCK)
4202            .await?
4203            .into_iter()
4204            .filter_map(|member| {
4205                let event_id = member.event().event_id()?;
4206                Some(KnockRequest::new(
4207                    self,
4208                    event_id,
4209                    member.event().timestamp(),
4210                    KnockRequestMemberInfo::from_member(&member),
4211                    seen_request_ids.contains_key(event_id),
4212                ))
4213            })
4214            .collect())
4215    }
4216
4217    /// Access the room settings related to privacy and visibility.
4218    pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4219        RoomPrivacySettings::new(&self.inner, &self.client)
4220    }
4221
4222    /// Retrieve a list of all the threads for the current room.
4223    ///
4224    /// Since this client-server API is paginated, the return type may include a
4225    /// token used to resuming back-pagination into the list of results, in
4226    /// [`ThreadRoots::prev_batch_token`]. This token can be fed back into
4227    /// [`ListThreadsOptions::from`] to continue the pagination
4228    /// from the previous position.
4229    pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4230        let request = opts.into_request(self.room_id());
4231
4232        let response = self.client.send(request).await?;
4233
4234        let push_ctx = self.push_context().await?;
4235        let chunk = join_all(
4236            response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4237        )
4238        .await;
4239
4240        Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4241    }
4242
4243    /// Retrieve a list of relations for the given event, according to the given
4244    /// options, using the network.
4245    ///
4246    /// Since this client-server API is paginated, the return type may include a
4247    /// token used to resuming back-pagination into the list of results, in
4248    /// [`Relations::prev_batch_token`]. This token can be fed back into
4249    /// [`RelationsOptions::from`] to continue the pagination from the previous
4250    /// position.
4251    ///
4252    /// **Note**: if [`RelationsOptions::from`] is set for a subsequent request,
4253    /// then it must be used with the same
4254    /// [`RelationsOptions::include_relations`] value as the request that
4255    /// returns the `from` token, otherwise the server behavior is undefined.
4256    pub async fn relations(
4257        &self,
4258        event_id: OwnedEventId,
4259        opts: RelationsOptions,
4260    ) -> Result<Relations> {
4261        let relations = opts.send(self, event_id).await;
4262
4263        // Save any new related events to the cache.
4264        if let Ok(Relations { chunk, .. }) = &relations
4265            && let Ok((cache, _handles)) = self.event_cache().await
4266        {
4267            cache.save_events(chunk.clone()).await;
4268        }
4269
4270        relations
4271    }
4272
4273    /// Search this room's [`RoomIndex`] for query and return at most
4274    /// max_number_of_results results.
4275    #[cfg(feature = "experimental-search")]
4276    pub async fn search(
4277        &self,
4278        query: &str,
4279        max_number_of_results: usize,
4280        pagination_offset: Option<usize>,
4281    ) -> Result<Vec<OwnedEventId>, IndexError> {
4282        let mut search_index_guard = self.client.search_index().lock().await;
4283        search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4284    }
4285
4286    /// Subscribe to a given thread in this room.
4287    ///
4288    /// This will subscribe the user to the thread, so that they will receive
4289    /// notifications for that thread specifically.
4290    ///
4291    /// # Arguments
4292    ///
4293    /// - `thread_root`: The ID of the thread root event to subscribe to.
4294    /// - `automatic`: Whether the subscription was made automatically by a
4295    ///   client, not by manual user choice. If set, must include the latest
4296    ///   event ID that's known in the thread and that is causing the automatic
4297    ///   subscription. If unset (i.e. we're now subscribing manually) and there
4298    ///   was a previous automatic subscription, the subscription will be
4299    ///   overridden to a manual one instead.
4300    ///
4301    /// # Returns
4302    ///
4303    /// - A 404 error if the event isn't known, or isn't a thread root.
4304    /// - An `Ok` result if the subscription was successful, or if the server
4305    ///   skipped an automatic subscription (as the user unsubscribed from the
4306    ///   thread after the event causing the automatic subscription).
4307    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4308    pub async fn subscribe_thread(
4309        &self,
4310        thread_root: OwnedEventId,
4311        automatic: Option<OwnedEventId>,
4312    ) -> Result<()> {
4313        let is_automatic = automatic.is_some();
4314
4315        match self
4316            .client
4317            .send(subscribe_thread::unstable::Request::new(
4318                self.room_id().to_owned(),
4319                thread_root.clone(),
4320                automatic,
4321            ))
4322            .await
4323        {
4324            Ok(_response) => {
4325                trace!("Server acknowledged the thread subscription; saving in db");
4326
4327                // Immediately save the result into the database.
4328                self.client
4329                    .state_store()
4330                    .upsert_thread_subscriptions(vec![(
4331                        self.room_id(),
4332                        &thread_root,
4333                        StoredThreadSubscription {
4334                            status: ThreadSubscriptionStatus::Subscribed {
4335                                automatic: is_automatic,
4336                            },
4337                            bump_stamp: None,
4338                        },
4339                    )])
4340                    .await?;
4341
4342                Ok(())
4343            }
4344
4345            Err(err) => {
4346                if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4347                    // In this case: the server indicates that the user unsubscribed *after* the
4348                    // event ID we've used in an automatic subscription; don't
4349                    // save the subscription state in the database, as the
4350                    // previous one should be more correct.
4351                    trace!("Thread subscription skipped: {err}");
4352                    Ok(())
4353                } else {
4354                    // Forward the error to the caller.
4355                    Err(err.into())
4356                }
4357            }
4358        }
4359    }
4360
4361    /// Subscribe to a thread if needed, based on a current subscription to it.
4362    ///
4363    /// This is like [`Self::subscribe_thread`], but it first checks if the user
4364    /// has already subscribed to a thread, so as to minimize sending
4365    /// unnecessary subscriptions which would be ignored by the server.
4366    pub async fn subscribe_thread_if_needed(
4367        &self,
4368        thread_root: &EventId,
4369        automatic: Option<OwnedEventId>,
4370    ) -> Result<()> {
4371        if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4372            // If we have a previous subscription, we should only send the new one if it's
4373            // manual and the previous one was automatic.
4374            if !prev_sub.automatic || automatic.is_some() {
4375                // Either we had already a manual subscription, or we had an automatic one and
4376                // the new one is automatic too: nothing to do!
4377                return Ok(());
4378            }
4379        }
4380        self.subscribe_thread(thread_root.to_owned(), automatic).await
4381    }
4382
4383    /// Unsubscribe from a given thread in this room.
4384    ///
4385    /// # Arguments
4386    ///
4387    /// - `thread_root`: The ID of the thread root event to unsubscribe to.
4388    ///
4389    /// # Returns
4390    ///
4391    /// - An `Ok` result if the unsubscription was successful, or the thread was
4392    ///   already unsubscribed.
4393    /// - A 404 error if the event isn't known, or isn't a thread root.
4394    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4395    pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4396        self.client
4397            .send(unsubscribe_thread::unstable::Request::new(
4398                self.room_id().to_owned(),
4399                thread_root.clone(),
4400            ))
4401            .await?;
4402
4403        trace!("Server acknowledged the thread subscription removal; removed it from db too");
4404
4405        // Immediately save the result into the database.
4406        self.client
4407            .state_store()
4408            .upsert_thread_subscriptions(vec![(
4409                self.room_id(),
4410                &thread_root,
4411                StoredThreadSubscription {
4412                    status: ThreadSubscriptionStatus::Unsubscribed,
4413                    bump_stamp: None,
4414                },
4415            )])
4416            .await?;
4417
4418        Ok(())
4419    }
4420
4421    /// Return the current thread subscription for the given thread root in this
4422    /// room.
4423    ///
4424    /// # Arguments
4425    ///
4426    /// - `thread_root`: The ID of the thread root event to get the subscription
4427    ///   for.
4428    ///
4429    /// # Returns
4430    ///
4431    /// - An `Ok` result with `Some(ThreadSubscription)` if we have some
4432    ///   subscription information.
4433    /// - An `Ok` result with `None` if the subscription does not exist, or the
4434    ///   event couldn't be found, or the event isn't a thread.
4435    /// - An error if the request fails for any other reason, such as a network
4436    ///   error.
4437    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4438    pub async fn fetch_thread_subscription(
4439        &self,
4440        thread_root: OwnedEventId,
4441    ) -> Result<Option<ThreadSubscription>> {
4442        let result = self
4443            .client
4444            .send(get_thread_subscription::unstable::Request::new(
4445                self.room_id().to_owned(),
4446                thread_root.clone(),
4447            ))
4448            .await;
4449
4450        let subscription = match result {
4451            Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4452            Err(http_error) => match http_error.as_client_api_error() {
4453                Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4454                _ => return Err(http_error.into()),
4455            },
4456        };
4457
4458        // Keep the database in sync.
4459        if let Some(sub) = &subscription {
4460            self.client
4461                .state_store()
4462                .upsert_thread_subscriptions(vec![(
4463                    self.room_id(),
4464                    &thread_root,
4465                    StoredThreadSubscription {
4466                        status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4467                        bump_stamp: None,
4468                    },
4469                )])
4470                .await?;
4471        } else {
4472            // If the subscription was not found, remove it from the database.
4473            self.client
4474                .state_store()
4475                .remove_thread_subscription(self.room_id(), &thread_root)
4476                .await?;
4477        }
4478
4479        Ok(subscription)
4480    }
4481
4482    /// Return the current thread subscription for the given thread root in this
4483    /// room, by getting it from storage if possible, or fetching it from
4484    /// network otherwise.
4485    ///
4486    /// See also [`Self::fetch_thread_subscription`] for the exact semantics of
4487    /// this method.
4488    pub async fn load_or_fetch_thread_subscription(
4489        &self,
4490        thread_root: &EventId,
4491    ) -> Result<Option<ThreadSubscription>> {
4492        // If the thread subscriptions list is outdated, fetch from the server.
4493        if self.client.thread_subscription_catchup().is_outdated() {
4494            return self.fetch_thread_subscription(thread_root.to_owned()).await;
4495        }
4496
4497        // Otherwise, we can rely on the store information.
4498        Ok(self
4499            .client
4500            .state_store()
4501            .load_thread_subscription(self.room_id(), thread_root)
4502            .await
4503            .map(|maybe_sub| {
4504                maybe_sub.and_then(|stored| match stored.status {
4505                    ThreadSubscriptionStatus::Unsubscribed => None,
4506                    ThreadSubscriptionStatus::Subscribed { automatic } => {
4507                        Some(ThreadSubscription { automatic })
4508                    }
4509                })
4510            })?)
4511    }
4512
4513    /// Adds a new pinned event by sending an updated `m.room.pinned_events`
4514    /// event containing the new event id.
4515    ///
4516    /// This method will first try to get the pinned events from the current
4517    /// room's state and if it fails to do so it'll try to load them from the
4518    /// homeserver.
4519    ///
4520    /// Returns `true` if we pinned the event, `false` if the event was already
4521    /// pinned.
4522    pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
4523        let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4524            event_ids
4525        } else {
4526            self.load_pinned_events().await?.unwrap_or_default()
4527        };
4528        let event_id = event_id.to_owned();
4529        if pinned_event_ids.contains(&event_id) {
4530            Ok(false)
4531        } else {
4532            pinned_event_ids.push(event_id);
4533            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4534            self.send_state_event(content).await?;
4535            Ok(true)
4536        }
4537    }
4538
4539    /// Removes a pinned event by sending an updated `m.room.pinned_events`
4540    /// event without the event id we want to remove.
4541    ///
4542    /// This method will first try to get the pinned events from the current
4543    /// room's state and if it fails to do so it'll try to load them from the
4544    /// homeserver.
4545    ///
4546    /// Returns `true` if we unpinned the event, `false` if the event wasn't
4547    /// pinned before.
4548    pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
4549        let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4550            event_ids
4551        } else {
4552            self.load_pinned_events().await?.unwrap_or_default()
4553        };
4554        let event_id = event_id.to_owned();
4555        if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
4556            pinned_event_ids.remove(idx);
4557            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4558            self.send_state_event(content).await?;
4559            Ok(true)
4560        } else {
4561            Ok(false)
4562        }
4563    }
4564}
4565
4566#[cfg(feature = "e2e-encryption")]
4567impl RoomIdentityProvider for Room {
4568    fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4569        Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4570    }
4571
4572    fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4573        Box::pin(async {
4574            let members = self
4575                .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4576                .await
4577                .unwrap_or_else(|_| Default::default());
4578
4579            let mut ret: Vec<UserIdentity> = Vec::new();
4580            for member in members {
4581                if let Some(i) = self.user_identity(member.user_id()).await {
4582                    ret.push(i);
4583                }
4584            }
4585            ret
4586        })
4587    }
4588
4589    fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4590        Box::pin(async {
4591            self.client
4592                .encryption()
4593                .get_user_identity(user_id)
4594                .await
4595                .unwrap_or(None)
4596                .map(|u| u.underlying_identity())
4597        })
4598    }
4599}
4600
4601/// A wrapper for a weak client and a room id that allows to lazily retrieve a
4602/// room, only when needed.
4603#[derive(Clone, Debug)]
4604pub(crate) struct WeakRoom {
4605    client: WeakClient,
4606    room_id: OwnedRoomId,
4607}
4608
4609impl WeakRoom {
4610    /// Create a new `WeakRoom` given its weak components.
4611    pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4612        Self { client, room_id }
4613    }
4614
4615    /// Attempts to reconstruct the room.
4616    pub fn get(&self) -> Option<Room> {
4617        self.client.get().and_then(|client| client.get_room(&self.room_id))
4618    }
4619
4620    /// The room id for that room.
4621    pub fn room_id(&self) -> &RoomId {
4622        &self.room_id
4623    }
4624}
4625
4626/// Details of the (latest) invite.
4627#[derive(Debug, Clone)]
4628pub struct Invite {
4629    /// Who has been invited.
4630    pub invitee: RoomMember,
4631
4632    /// The user ID of who sent the invite.
4633    ///
4634    /// This is useful if `Self::inviter` is `None`.
4635    pub inviter_id: OwnedUserId,
4636
4637    /// Who sent the invite.
4638    ///
4639    /// If `None`, check `Self::inviter_id`, it might be useful as a fallback.
4640    pub inviter: Option<RoomMember>,
4641}
4642
4643#[derive(Error, Debug)]
4644enum InvitationError {
4645    #[error("No membership event found")]
4646    EventMissing,
4647}
4648
4649/// Receipts to send all at once.
4650#[derive(Debug, Clone, Default)]
4651#[non_exhaustive]
4652pub struct Receipts {
4653    /// Fully-read marker (room account data).
4654    pub fully_read: Option<OwnedEventId>,
4655    /// Read receipt (public ephemeral room event).
4656    pub public_read_receipt: Option<OwnedEventId>,
4657    /// Read receipt (private ephemeral room event).
4658    pub private_read_receipt: Option<OwnedEventId>,
4659}
4660
4661impl Receipts {
4662    /// Create an empty `Receipts`.
4663    pub fn new() -> Self {
4664        Self::default()
4665    }
4666
4667    /// Set the last event the user has read.
4668    ///
4669    /// It means that the user has read all the events before this event.
4670    ///
4671    /// This is a private marker only visible by the user.
4672    ///
4673    /// Note that this is technically not a receipt as it is persisted in the
4674    /// room account data.
4675    pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4676        self.fully_read = event_id.into();
4677        self
4678    }
4679
4680    /// Set the last event presented to the user and forward it to the other
4681    /// users in the room.
4682    ///
4683    /// This is used to reset the unread messages/notification count and
4684    /// advertise to other users the last event that the user has likely seen.
4685    pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4686        self.public_read_receipt = event_id.into();
4687        self
4688    }
4689
4690    /// Set the last event presented to the user and don't forward it.
4691    ///
4692    /// This is used to reset the unread messages/notification count.
4693    pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4694        self.private_read_receipt = event_id.into();
4695        self
4696    }
4697
4698    /// Whether this `Receipts` is empty.
4699    pub fn is_empty(&self) -> bool {
4700        self.fully_read.is_none()
4701            && self.public_read_receipt.is_none()
4702            && self.private_read_receipt.is_none()
4703    }
4704}
4705
4706/// [Parent space](https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships)
4707/// listed by a room, possibly validated by checking the space's state.
4708#[derive(Debug)]
4709pub enum ParentSpace {
4710    /// The room recognizes the given room as its parent, and the parent
4711    /// recognizes it as its child.
4712    Reciprocal(Room),
4713    /// The room recognizes the given room as its parent, but the parent does
4714    /// not recognizes it as its child. However, the author of the
4715    /// `m.space.parent` event in the room has a sufficient power level in the
4716    /// parent to create the child event.
4717    WithPowerlevel(Room),
4718    /// The room recognizes the given room as its parent, but the parent does
4719    /// not recognizes it as its child.
4720    Illegitimate(Room),
4721    /// The room recognizes the given id as its parent room, but we cannot check
4722    /// whether the parent recognizes it as its child.
4723    Unverifiable(OwnedRoomId),
4724}
4725
4726/// The score to rate an inappropriate content.
4727///
4728/// Must be a value between `0`, inoffensive, and `-100`, very offensive.
4729#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4730pub struct ReportedContentScore(i8);
4731
4732impl ReportedContentScore {
4733    /// The smallest value that can be represented by this type.
4734    ///
4735    /// This is for very offensive content.
4736    pub const MIN: Self = Self(-100);
4737
4738    /// The largest value that can be represented by this type.
4739    ///
4740    /// This is for inoffensive content.
4741    pub const MAX: Self = Self(0);
4742
4743    /// Try to create a `ReportedContentScore` from the provided `i8`.
4744    ///
4745    /// Returns `None` if it is smaller than [`ReportedContentScore::MIN`] or
4746    /// larger than [`ReportedContentScore::MAX`] .
4747    ///
4748    /// This is the same as the `TryFrom<i8>` implementation for
4749    /// `ReportedContentScore`, except that it returns an `Option` instead
4750    /// of a `Result`.
4751    pub fn new(value: i8) -> Option<Self> {
4752        value.try_into().ok()
4753    }
4754
4755    /// Create a `ReportedContentScore` from the provided `i8` clamped to the
4756    /// acceptable interval.
4757    ///
4758    /// The given value gets clamped into the closed interval between
4759    /// [`ReportedContentScore::MIN`] and [`ReportedContentScore::MAX`].
4760    pub fn new_saturating(value: i8) -> Self {
4761        if value > Self::MAX {
4762            Self::MAX
4763        } else if value < Self::MIN {
4764            Self::MIN
4765        } else {
4766            Self(value)
4767        }
4768    }
4769
4770    /// The value of this score.
4771    pub fn value(&self) -> i8 {
4772        self.0
4773    }
4774}
4775
4776impl PartialEq<i8> for ReportedContentScore {
4777    fn eq(&self, other: &i8) -> bool {
4778        self.0.eq(other)
4779    }
4780}
4781
4782impl PartialEq<ReportedContentScore> for i8 {
4783    fn eq(&self, other: &ReportedContentScore) -> bool {
4784        self.eq(&other.0)
4785    }
4786}
4787
4788impl PartialOrd<i8> for ReportedContentScore {
4789    fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4790        self.0.partial_cmp(other)
4791    }
4792}
4793
4794impl PartialOrd<ReportedContentScore> for i8 {
4795    fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4796        self.partial_cmp(&other.0)
4797    }
4798}
4799
4800impl From<ReportedContentScore> for Int {
4801    fn from(value: ReportedContentScore) -> Self {
4802        value.0.into()
4803    }
4804}
4805
4806impl TryFrom<i8> for ReportedContentScore {
4807    type Error = TryFromReportedContentScoreError;
4808
4809    fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4810        if value > Self::MAX || value < Self::MIN {
4811            Err(TryFromReportedContentScoreError(()))
4812        } else {
4813            Ok(Self(value))
4814        }
4815    }
4816}
4817
4818impl TryFrom<i16> for ReportedContentScore {
4819    type Error = TryFromReportedContentScoreError;
4820
4821    fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4822        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4823        value.try_into()
4824    }
4825}
4826
4827impl TryFrom<i32> for ReportedContentScore {
4828    type Error = TryFromReportedContentScoreError;
4829
4830    fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4831        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4832        value.try_into()
4833    }
4834}
4835
4836impl TryFrom<i64> for ReportedContentScore {
4837    type Error = TryFromReportedContentScoreError;
4838
4839    fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4840        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4841        value.try_into()
4842    }
4843}
4844
4845impl TryFrom<Int> for ReportedContentScore {
4846    type Error = TryFromReportedContentScoreError;
4847
4848    fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4849        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4850        value.try_into()
4851    }
4852}
4853
4854trait EventSource {
4855    fn get_event(
4856        &self,
4857        event_id: &EventId,
4858    ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4859}
4860
4861impl EventSource for &Room {
4862    async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4863        self.load_or_fetch_event(event_id, None).await
4864    }
4865}
4866
4867/// The error type returned when a checked `ReportedContentScore` conversion
4868/// fails.
4869#[derive(Debug, Clone, Error)]
4870#[error("out of range conversion attempted")]
4871pub struct TryFromReportedContentScoreError(());
4872
4873/// Contains the current user's room member info and the optional room member
4874/// info of the sender of the `m.room.member` event that this info represents.
4875#[derive(Debug)]
4876pub struct RoomMemberWithSenderInfo {
4877    /// The actual room member.
4878    pub room_member: RoomMember,
4879    /// The info of the sender of the event `room_member` is based on, if
4880    /// available.
4881    pub sender_info: Option<RoomMember>,
4882}
4883
4884#[cfg(all(test, not(target_family = "wasm")))]
4885mod tests {
4886    use std::collections::BTreeMap;
4887
4888    use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4889    use matrix_sdk_test::{
4890        JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
4891    };
4892    use ruma::{
4893        RoomVersionId, event_id,
4894        events::{relation::RelationType, room::member::MembershipState},
4895        int, owned_event_id, room_id, user_id,
4896    };
4897    use wiremock::{
4898        Mock, MockServer, ResponseTemplate,
4899        matchers::{header, method, path_regex},
4900    };
4901
4902    use super::ReportedContentScore;
4903    use crate::{
4904        Client,
4905        config::RequestConfig,
4906        room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4907        test_utils::{
4908            client::mock_matrix_session,
4909            logged_in_client,
4910            mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4911        },
4912    };
4913
4914    #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4915    #[async_test]
4916    async fn test_cache_invalidation_while_encrypt() {
4917        use matrix_sdk_base::store::RoomLoadSettings;
4918        use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4919
4920        let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4921        let session = mock_matrix_session();
4922
4923        let client = Client::builder()
4924            .homeserver_url("http://localhost:1234")
4925            .request_config(RequestConfig::new().disable_retry())
4926            .sqlite_store(&sqlite_path, None)
4927            .build()
4928            .await
4929            .unwrap();
4930        client
4931            .matrix_auth()
4932            .restore_session(session.clone(), RoomLoadSettings::default())
4933            .await
4934            .unwrap();
4935
4936        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4937
4938        // Mock receiving an event to create an internal room.
4939        let server = MockServer::start().await;
4940        {
4941            Mock::given(method("GET"))
4942                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4943                .and(header("authorization", "Bearer 1234"))
4944                .respond_with(
4945                    ResponseTemplate::new(200)
4946                        .set_body_json(EventFactory::new().room_encryption().into_content()),
4947                )
4948                .mount(&server)
4949                .await;
4950            let f = EventFactory::new().sender(user_id!("@example:localhost"));
4951            let response = SyncResponseBuilder::default()
4952                .add_joined_room(
4953                    JoinedRoomBuilder::default()
4954                        .add_state_event(
4955                            f.member(user_id!("@example:localhost")).display_name("example"),
4956                        )
4957                        .add_state_event(f.default_power_levels())
4958                        .add_state_event(f.room_encryption()),
4959                )
4960                .build_sync_response();
4961            client.base_client().receive_sync_response(response).await.unwrap();
4962        }
4963
4964        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4965
4966        // Step 1, preshare the room keys.
4967        room.preshare_room_key().await.unwrap();
4968
4969        // Step 2, force lock invalidation by pretending another client obtained the
4970        // lock.
4971        {
4972            let client = Client::builder()
4973                .homeserver_url("http://localhost:1234")
4974                .request_config(RequestConfig::new().disable_retry())
4975                .sqlite_store(&sqlite_path, None)
4976                .build()
4977                .await
4978                .unwrap();
4979            client
4980                .matrix_auth()
4981                .restore_session(session.clone(), RoomLoadSettings::default())
4982                .await
4983                .unwrap();
4984            client
4985                .encryption()
4986                .enable_cross_process_store_lock("client2".to_owned())
4987                .await
4988                .unwrap();
4989
4990            let guard = client.encryption().spin_lock_store(None).await.unwrap();
4991            assert!(guard.is_some());
4992        }
4993
4994        // Step 3, take the crypto-store lock.
4995        let guard = client.encryption().spin_lock_store(None).await.unwrap();
4996        assert!(guard.is_some());
4997
4998        // Step 4, try to encrypt a message.
4999        let olm = client.olm_machine().await;
5000        let olm = olm.as_ref().expect("Olm machine wasn't started");
5001
5002        // Now pretend we're encrypting an event; the olm machine shouldn't rely on
5003        // caching the outgoing session before.
5004        let _encrypted_content = olm
5005            .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
5006            .await
5007            .unwrap();
5008    }
5009
5010    #[test]
5011    fn reported_content_score() {
5012        // i8
5013        let score = ReportedContentScore::new(0).unwrap();
5014        assert_eq!(score.value(), 0);
5015        let score = ReportedContentScore::new(-50).unwrap();
5016        assert_eq!(score.value(), -50);
5017        let score = ReportedContentScore::new(-100).unwrap();
5018        assert_eq!(score.value(), -100);
5019        assert_eq!(ReportedContentScore::new(10), None);
5020        assert_eq!(ReportedContentScore::new(-110), None);
5021
5022        let score = ReportedContentScore::new_saturating(0);
5023        assert_eq!(score.value(), 0);
5024        let score = ReportedContentScore::new_saturating(-50);
5025        assert_eq!(score.value(), -50);
5026        let score = ReportedContentScore::new_saturating(-100);
5027        assert_eq!(score.value(), -100);
5028        let score = ReportedContentScore::new_saturating(10);
5029        assert_eq!(score, ReportedContentScore::MAX);
5030        let score = ReportedContentScore::new_saturating(-110);
5031        assert_eq!(score, ReportedContentScore::MIN);
5032
5033        // i16
5034        let score = ReportedContentScore::try_from(0i16).unwrap();
5035        assert_eq!(score.value(), 0);
5036        let score = ReportedContentScore::try_from(-100i16).unwrap();
5037        assert_eq!(score.value(), -100);
5038        ReportedContentScore::try_from(10i16).unwrap_err();
5039        ReportedContentScore::try_from(-110i16).unwrap_err();
5040
5041        // i32
5042        let score = ReportedContentScore::try_from(0i32).unwrap();
5043        assert_eq!(score.value(), 0);
5044        let score = ReportedContentScore::try_from(-100i32).unwrap();
5045        assert_eq!(score.value(), -100);
5046        ReportedContentScore::try_from(10i32).unwrap_err();
5047        ReportedContentScore::try_from(-110i32).unwrap_err();
5048
5049        // i64
5050        let score = ReportedContentScore::try_from(0i64).unwrap();
5051        assert_eq!(score.value(), 0);
5052        let score = ReportedContentScore::try_from(-100i64).unwrap();
5053        assert_eq!(score.value(), -100);
5054        ReportedContentScore::try_from(10i64).unwrap_err();
5055        ReportedContentScore::try_from(-110i64).unwrap_err();
5056
5057        // Int
5058        let score = ReportedContentScore::try_from(int!(0)).unwrap();
5059        assert_eq!(score.value(), 0);
5060        let score = ReportedContentScore::try_from(int!(-100)).unwrap();
5061        assert_eq!(score.value(), -100);
5062        ReportedContentScore::try_from(int!(10)).unwrap_err();
5063        ReportedContentScore::try_from(int!(-110)).unwrap_err();
5064    }
5065
5066    #[async_test]
5067    async fn test_composer_draft() {
5068        use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
5069
5070        let client = logged_in_client(None).await;
5071
5072        let response = SyncResponseBuilder::default()
5073            .add_joined_room(JoinedRoomBuilder::default())
5074            .build_sync_response();
5075        client.base_client().receive_sync_response(response).await.unwrap();
5076        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
5077
5078        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
5079
5080        // Save 2 drafts, one for the room and one for a thread.
5081
5082        let draft = ComposerDraft {
5083            plain_text: "Hello, world!".to_owned(),
5084            html_text: Some("<strong>Hello</strong>, world!".to_owned()),
5085            draft_type: ComposerDraftType::NewMessage,
5086            attachments: vec![DraftAttachment {
5087                filename: "cat.txt".to_owned(),
5088                content: matrix_sdk_base::DraftAttachmentContent::File {
5089                    data: b"meow".to_vec(),
5090                    mimetype: Some("text/plain".to_owned()),
5091                    size: Some(5),
5092                },
5093            }],
5094        };
5095
5096        room.save_composer_draft(draft.clone(), None).await.unwrap();
5097
5098        let thread_root = owned_event_id!("$thread_root:b.c");
5099        let thread_draft = ComposerDraft {
5100            plain_text: "Hello, thread!".to_owned(),
5101            html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
5102            draft_type: ComposerDraftType::NewMessage,
5103            attachments: vec![DraftAttachment {
5104                filename: "dog.txt".to_owned(),
5105                content: matrix_sdk_base::DraftAttachmentContent::File {
5106                    data: b"wuv".to_vec(),
5107                    mimetype: Some("text/plain".to_owned()),
5108                    size: Some(4),
5109                },
5110            }],
5111        };
5112
5113        room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
5114
5115        // Check that the room draft was saved correctly
5116        assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
5117
5118        // Check that the thread draft was saved correctly
5119        assert_eq!(
5120            room.load_composer_draft(Some(&thread_root)).await.unwrap(),
5121            Some(thread_draft.clone())
5122        );
5123
5124        // Clear the room draft
5125        room.clear_composer_draft(None).await.unwrap();
5126        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
5127
5128        // Check that the thread one is still there
5129        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
5130
5131        // Clear the thread draft as well
5132        room.clear_composer_draft(Some(&thread_root)).await.unwrap();
5133        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
5134    }
5135
5136    #[async_test]
5137    async fn test_mark_join_requests_as_seen() {
5138        let server = MatrixMockServer::new().await;
5139        let client = server.client_builder().build().await;
5140        let event_id = event_id!("$a:b.c");
5141        let room_id = room_id!("!a:b.c");
5142        let user_id = user_id!("@alice:b.c");
5143
5144        let f = EventFactory::new().room(room_id);
5145        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5146            f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
5147        ]);
5148        let room = server.sync_room(&client, joined_room_builder).await;
5149
5150        // When loading the initial seen ids, there are none
5151        let seen_ids =
5152            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
5153        assert!(seen_ids.is_empty());
5154
5155        // We mark a random event id as seen
5156        room.mark_knock_requests_as_seen(&[user_id.to_owned()])
5157            .await
5158            .expect("Couldn't mark join request as seen");
5159
5160        // Then we can check it was successfully marked as seen
5161        let seen_ids =
5162            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
5163        assert_eq!(seen_ids.len(), 1);
5164        assert_eq!(
5165            seen_ids.into_iter().next().expect("No next value"),
5166            (event_id.to_owned(), user_id.to_owned())
5167        )
5168    }
5169
5170    #[async_test]
5171    async fn test_own_room_membership_with_no_own_member_event() {
5172        let server = MatrixMockServer::new().await;
5173        let client = server.client_builder().build().await;
5174        let room_id = room_id!("!a:b.c");
5175
5176        let room = server.sync_joined_room(&client, room_id).await;
5177
5178        // Since there is no member event for the own user, the method fails.
5179        // This should never happen in an actual room.
5180        let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
5181        assert!(error.is_some());
5182    }
5183
5184    #[async_test]
5185    async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
5186        let server = MatrixMockServer::new().await;
5187        let client = server.client_builder().build().await;
5188        let room_id = room_id!("!a:b.c");
5189        let user_id = user_id!("@example:localhost");
5190
5191        let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
5192        let joined_room_builder =
5193            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5194        let room = server.sync_room(&client, joined_room_builder).await;
5195
5196        // When we load the membership details
5197        let ret = room
5198            .member_with_sender_info(client.user_id().unwrap())
5199            .await
5200            .expect("Room member info should be available");
5201
5202        // We get the member info for the current user
5203        assert_eq!(ret.room_member.event().user_id(), user_id);
5204
5205        // But there is no info for the sender
5206        assert!(ret.sender_info.is_none());
5207    }
5208
5209    #[async_test]
5210    async fn test_own_room_membership_with_own_member_event_and_own_sender() {
5211        let server = MatrixMockServer::new().await;
5212        let client = server.client_builder().build().await;
5213        let room_id = room_id!("!a:b.c");
5214        let user_id = user_id!("@example:localhost");
5215
5216        let f = EventFactory::new().room(room_id).sender(user_id);
5217        let joined_room_builder =
5218            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5219        let room = server.sync_room(&client, joined_room_builder).await;
5220
5221        // When we load the membership details
5222        let ret = room
5223            .member_with_sender_info(client.user_id().unwrap())
5224            .await
5225            .expect("Room member info should be available");
5226
5227        // We get the current user's member info
5228        assert_eq!(ret.room_member.event().user_id(), user_id);
5229
5230        // And the sender has the same info, since it's also the current user
5231        assert!(ret.sender_info.is_some());
5232        assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5233    }
5234
5235    #[async_test]
5236    async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5237        let server = MatrixMockServer::new().await;
5238        let client = server.client_builder().build().await;
5239        let room_id = room_id!("!a:b.c");
5240        let user_id = user_id!("@example:localhost");
5241        let sender_id = user_id!("@alice:b.c");
5242
5243        let f = EventFactory::new().room(room_id).sender(sender_id);
5244        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5245            f.member(user_id).into(),
5246            // The sender info comes from the sync
5247            f.member(sender_id).into(),
5248        ]);
5249        let room = server.sync_room(&client, joined_room_builder).await;
5250
5251        // When we load the membership details
5252        let ret = room
5253            .member_with_sender_info(client.user_id().unwrap())
5254            .await
5255            .expect("Room member info should be available");
5256
5257        // We get the current user's member info
5258        assert_eq!(ret.room_member.event().user_id(), user_id);
5259
5260        // And also the sender info from the events received in the sync
5261        assert!(ret.sender_info.is_some());
5262        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5263    }
5264
5265    #[async_test]
5266    async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5267        let server = MatrixMockServer::new().await;
5268        let client = server.client_builder().build().await;
5269        let room_id = room_id!("!a:b.c");
5270        let user_id = user_id!("@example:localhost");
5271        let sender_id = user_id!("@alice:b.c");
5272
5273        let f = EventFactory::new().room(room_id).sender(sender_id);
5274        let joined_room_builder =
5275            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5276        let room = server.sync_room(&client, joined_room_builder).await;
5277
5278        // We'll receive the member info through the /members endpoint
5279        server
5280            .mock_get_members()
5281            .ok(vec![f.member(sender_id).into_raw()])
5282            .mock_once()
5283            .mount()
5284            .await;
5285
5286        // We get the current user's member info
5287        let ret = room
5288            .member_with_sender_info(client.user_id().unwrap())
5289            .await
5290            .expect("Room member info should be available");
5291
5292        // We get the current user's member info
5293        assert_eq!(ret.room_member.event().user_id(), user_id);
5294
5295        // And also the sender info from the /members endpoint
5296        assert!(ret.sender_info.is_some());
5297        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5298    }
5299
5300    #[async_test]
5301    async fn test_list_threads() {
5302        let server = MatrixMockServer::new().await;
5303        let client = server.client_builder().build().await;
5304
5305        let room_id = room_id!("!a:b.c");
5306        let sender_id = user_id!("@alice:b.c");
5307        let f = EventFactory::new().room(room_id).sender(sender_id);
5308
5309        let eid1 = event_id!("$1");
5310        let eid2 = event_id!("$2");
5311        let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5312        let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5313
5314        server
5315            .mock_room_threads()
5316            .ok(batch1.clone(), Some("prev_batch".to_owned()))
5317            .mock_once()
5318            .mount()
5319            .await;
5320        server
5321            .mock_room_threads()
5322            .match_from("prev_batch")
5323            .ok(batch2, None)
5324            .mock_once()
5325            .mount()
5326            .await;
5327
5328        let room = server.sync_joined_room(&client, room_id).await;
5329        let result =
5330            room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5331        assert_eq!(result.chunk.len(), 1);
5332        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5333        assert!(result.prev_batch_token.is_some());
5334
5335        let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5336        let result = room.list_threads(opts).await.expect("Failed to list threads");
5337        assert_eq!(result.chunk.len(), 1);
5338        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5339        assert!(result.prev_batch_token.is_none());
5340    }
5341
5342    #[async_test]
5343    async fn test_relations() {
5344        let server = MatrixMockServer::new().await;
5345        let client = server.client_builder().build().await;
5346
5347        let room_id = room_id!("!a:b.c");
5348        let sender_id = user_id!("@alice:b.c");
5349        let f = EventFactory::new().room(room_id).sender(sender_id);
5350
5351        let target_event_id = owned_event_id!("$target");
5352        let eid1 = event_id!("$1");
5353        let eid2 = event_id!("$2");
5354        let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5355        let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5356
5357        server
5358            .mock_room_relations()
5359            .match_target_event(target_event_id.clone())
5360            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5361            .mock_once()
5362            .mount()
5363            .await;
5364
5365        server
5366            .mock_room_relations()
5367            .match_target_event(target_event_id.clone())
5368            .match_from("next_batch")
5369            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5370            .mock_once()
5371            .mount()
5372            .await;
5373
5374        let room = server.sync_joined_room(&client, room_id).await;
5375
5376        // Main endpoint: no relation type filtered out.
5377        let mut opts = RelationsOptions {
5378            include_relations: IncludeRelations::AllRelations,
5379            ..Default::default()
5380        };
5381        let result = room
5382            .relations(target_event_id.clone(), opts.clone())
5383            .await
5384            .expect("Failed to list relations the first time");
5385        assert_eq!(result.chunk.len(), 1);
5386        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5387        assert!(result.prev_batch_token.is_none());
5388        assert!(result.next_batch_token.is_some());
5389        assert!(result.recursion_depth.is_none());
5390
5391        opts.from = result.next_batch_token;
5392        let result = room
5393            .relations(target_event_id, opts)
5394            .await
5395            .expect("Failed to list relations the second time");
5396        assert_eq!(result.chunk.len(), 1);
5397        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5398        assert!(result.prev_batch_token.is_none());
5399        assert!(result.next_batch_token.is_none());
5400        assert!(result.recursion_depth.is_none());
5401    }
5402
5403    #[async_test]
5404    async fn test_relations_with_reltype() {
5405        let server = MatrixMockServer::new().await;
5406        let client = server.client_builder().build().await;
5407
5408        let room_id = room_id!("!a:b.c");
5409        let sender_id = user_id!("@alice:b.c");
5410        let f = EventFactory::new().room(room_id).sender(sender_id);
5411
5412        let target_event_id = owned_event_id!("$target");
5413        let eid1 = event_id!("$1");
5414        let eid2 = event_id!("$2");
5415        let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5416        let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5417
5418        server
5419            .mock_room_relations()
5420            .match_target_event(target_event_id.clone())
5421            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5422            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5423            .mock_once()
5424            .mount()
5425            .await;
5426
5427        server
5428            .mock_room_relations()
5429            .match_target_event(target_event_id.clone())
5430            .match_from("next_batch")
5431            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5432            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5433            .mock_once()
5434            .mount()
5435            .await;
5436
5437        let room = server.sync_joined_room(&client, room_id).await;
5438
5439        // Reltype-filtered endpoint, for threads \o/
5440        let mut opts = RelationsOptions {
5441            include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5442            ..Default::default()
5443        };
5444        let result = room
5445            .relations(target_event_id.clone(), opts.clone())
5446            .await
5447            .expect("Failed to list relations the first time");
5448        assert_eq!(result.chunk.len(), 1);
5449        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5450        assert!(result.prev_batch_token.is_none());
5451        assert!(result.next_batch_token.is_some());
5452        assert!(result.recursion_depth.is_none());
5453
5454        opts.from = result.next_batch_token;
5455        let result = room
5456            .relations(target_event_id, opts)
5457            .await
5458            .expect("Failed to list relations the second time");
5459        assert_eq!(result.chunk.len(), 1);
5460        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5461        assert!(result.prev_batch_token.is_none());
5462        assert!(result.next_batch_token.is_none());
5463        assert!(result.recursion_depth.is_none());
5464    }
5465
5466    #[async_test]
5467    async fn test_power_levels_computation() {
5468        let server = MatrixMockServer::new().await;
5469        let client = server.client_builder().build().await;
5470
5471        let room_id = room_id!("!a:b.c");
5472        let sender_id = client.user_id().expect("No session id");
5473        let f = EventFactory::new().room(room_id).sender(sender_id);
5474        let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5475
5476        // Computing the power levels will need these 3 state events:
5477        let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5478        let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5479        let room_member_event = f.member(sender_id).into();
5480
5481        // With only the room member event
5482        let room = server
5483            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5484            .await;
5485        let ctx = room
5486            .push_condition_room_ctx()
5487            .await
5488            .expect("Failed to get push condition context")
5489            .expect("Could not get push condition context");
5490
5491        // The internal power levels couldn't be computed
5492        assert!(ctx.power_levels.is_none());
5493
5494        // Adding the room creation event
5495        let room = server
5496            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5497            .await;
5498        let ctx = room
5499            .push_condition_room_ctx()
5500            .await
5501            .expect("Failed to get push condition context")
5502            .expect("Could not get push condition context");
5503
5504        // The internal power levels still couldn't be computed
5505        assert!(ctx.power_levels.is_none());
5506
5507        // With the room member, room creation and the power levels events
5508        let room = server
5509            .sync_room(
5510                &client,
5511                JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5512            )
5513            .await;
5514        let ctx = room
5515            .push_condition_room_ctx()
5516            .await
5517            .expect("Failed to get push condition context")
5518            .expect("Could not get push condition context");
5519
5520        // The internal power levels can finally be computed
5521        assert!(ctx.power_levels.is_some());
5522    }
5523}