Skip to main content

matrix_sdk/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level room API
16
17use std::{
18    borrow::Borrow,
19    collections::{BTreeMap, HashMap},
20    future::Future,
21    ops::Deref,
22    sync::Arc,
23    time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30    StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39    IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43    ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
44    StateChanges, StateStoreDataKey, StateStoreDataValue,
45    deserialized_responses::{
46        RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47    },
48    media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49    serde_helpers::extract_relation,
50    store::{StateStoreExt, ThreadSubscriptionStatus},
51};
52#[cfg(feature = "e2e-encryption")]
53use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
54#[cfg(feature = "e2e-encryption")]
55use matrix_sdk_common::BoxFuture;
56use matrix_sdk_common::{
57    deserialized_responses::TimelineEvent,
58    executor::{JoinHandle, spawn},
59    timeout::timeout,
60};
61#[cfg(feature = "experimental-search")]
62use matrix_sdk_search::error::IndexError;
63#[cfg(feature = "experimental-search")]
64#[cfg(doc)]
65use matrix_sdk_search::index::RoomIndex;
66use mime::Mime;
67use reply::Reply;
68#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
69use ruma::events::AnySyncMessageLikeEvent;
70#[cfg(feature = "experimental-encrypted-state-events")]
71use ruma::events::AnySyncStateEvent;
72#[cfg(feature = "unstable-msc4274")]
73use ruma::events::room::message::GalleryItemType;
74#[cfg(feature = "e2e-encryption")]
75use ruma::events::{
76    AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
77};
78use ruma::{
79    EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
80    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
81    api::{
82        client::{
83            config::{set_global_account_data, set_room_account_data},
84            context,
85            filter::LazyLoadOptions,
86            membership::{
87                Invite3pid, ban_user, forget_room, get_member_events,
88                invite_user::{
89                    self,
90                    v3::{InvitationRecipient, InviteUserId},
91                },
92                kick_user, leave_room, unban_user,
93            },
94            message::send_message_event,
95            read_marker::set_read_marker,
96            receipt::create_receipt,
97            redact::redact_event,
98            room::{get_room_event, report_content, report_room},
99            state::{get_state_event_for_key, send_state_event},
100            tag::{create_tag, delete_tag},
101            threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
102            typing::create_typing_event::{
103                self,
104                v3::{Typing, TypingInfo},
105            },
106        },
107        error::ErrorKind,
108    },
109    assign,
110    events::{
111        AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
112        Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
113        RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
114        RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
115        StaticStateEventContent, SyncStateEvent,
116        beacon::BeaconEventContent,
117        beacon_info::BeaconInfoEventContent,
118        direct::DirectEventContent,
119        marked_unread::MarkedUnreadEventContent,
120        receipt::{Receipt, ReceiptThread, ReceiptType},
121        relation::RelationType,
122        room::{
123            ImageInfo, MediaSource, ThumbnailInfo,
124            avatar::{self, RoomAvatarEventContent},
125            encryption::PossiblyRedactedRoomEncryptionEventContent,
126            history_visibility::HistoryVisibility,
127            member::{MembershipChange, RoomMemberEventContent, SyncRoomMemberEvent},
128            message::{
129                AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
130                ImageMessageEventContent, MessageType, RoomMessageEventContent,
131                TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
132                UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
133            },
134            name::RoomNameEventContent,
135            pinned_events::RoomPinnedEventsEventContent,
136            power_levels::{
137                RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
138            },
139            server_acl::RoomServerAclEventContent,
140            topic::RoomTopicEventContent,
141        },
142        space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
143        tag::{TagInfo, TagName},
144        typing::SyncTypingEvent,
145    },
146    int,
147    push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
148    serde::Raw,
149    time::Instant,
150    uint,
151};
152#[cfg(feature = "experimental-encrypted-state-events")]
153use ruma::{
154    events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
155    serde::JsonCastable,
156};
157use serde::de::DeserializeOwned;
158use thiserror::Error;
159use tokio::{join, sync::broadcast};
160use tracing::{debug, error, info, instrument, trace, warn};
161
162use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
163pub use self::{
164    member::{RoomMember, RoomMemberRole},
165    messages::{
166        EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
167        Relations, RelationsOptions, ThreadRoots,
168    },
169};
170#[cfg(feature = "e2e-encryption")]
171use crate::encryption::backups::BackupState;
172#[cfg(doc)]
173use crate::event_cache::EventCache;
174#[cfg(feature = "experimental-encrypted-state-events")]
175use crate::room::futures::{SendRawStateEvent, SendStateEvent};
176use crate::{
177    BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
178    attachment::{AttachmentConfig, AttachmentInfo},
179    client::WeakClient,
180    config::RequestConfig,
181    error::{BeaconError, WrongRoomState},
182    event_cache::{self, EventCacheDropHandles, RoomEventCache},
183    event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
184    live_location_share::ObservableLiveLocation,
185    media::{MediaFormat, MediaRequestParameters},
186    notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
187    room::{
188        knock_requests::{KnockRequest, KnockRequestMemberInfo},
189        power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
190        privacy_settings::RoomPrivacySettings,
191    },
192    sync::RoomUpdate,
193    utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
194};
195
196pub mod edit;
197pub mod futures;
198pub mod identity_status_changes;
199/// Contains code related to requests to join a room.
200pub mod knock_requests;
201mod member;
202mod messages;
203pub mod power_levels;
204pub mod reply;
205
206pub mod calls;
207
208/// Contains all the functionality for modifying the privacy settings in a room.
209pub mod privacy_settings;
210
211#[cfg(feature = "e2e-encryption")]
212pub(crate) mod shared_room_history;
213
214/// A struct containing methods that are common for Joined, Invited and Left
215/// Rooms
216#[derive(Debug, Clone)]
217pub struct Room {
218    inner: BaseRoom,
219    pub(crate) client: Client,
220}
221
222impl Deref for Room {
223    type Target = BaseRoom;
224
225    fn deref(&self) -> &Self::Target {
226        &self.inner
227    }
228}
229
230const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
231const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
232
233/// A thread subscription, according to the semantics of MSC4306.
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub struct ThreadSubscription {
236    /// Whether the subscription was made automatically by a client, not by
237    /// manual user choice.
238    pub automatic: bool,
239}
240
241/// Context allowing to compute the push actions for a given event.
242#[derive(Debug)]
243pub struct PushContext {
244    /// The Ruma context used to compute the push actions.
245    push_condition_room_ctx: PushConditionRoomCtx,
246
247    /// Push rules for this room, based on the push rules state event, or the
248    /// global server default as defined by [`Ruleset::server_default`].
249    push_rules: Ruleset,
250}
251
252impl PushContext {
253    /// Create a new [`PushContext`] from its inner components.
254    pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
255        Self { push_condition_room_ctx, push_rules }
256    }
257
258    /// Compute the push rules for a given event.
259    pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
260        self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
261    }
262
263    /// Compute the push rules for a given event, with extra logging to help
264    /// debugging.
265    #[doc(hidden)]
266    #[instrument(skip_all)]
267    pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
268        let rules = self
269            .push_rules
270            .iter()
271            .filter_map(|r| {
272                if !r.enabled() {
273                    return None;
274                }
275
276                let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
277
278                let conditions = match r {
279                    AnyPushRuleRef::Override(r) => {
280                        format!("{:?}", r.conditions)
281                    }
282                    AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
283                    AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
284                    AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
285                    AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
286                    _ => "<unknown push rule kind>".to_owned(),
287                };
288
289                Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
290            })
291            .collect::<Vec<_>>()
292            .join("\n");
293        trace!("rules:\n\n{rules}\n\n");
294
295        let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
296
297        if let Some(found) = found {
298            trace!("rule {} matched", found.rule_id());
299            found.actions().to_owned()
300        } else {
301            trace!("no match");
302            Vec::new()
303        }
304    }
305}
306
307macro_rules! make_media_type {
308    ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
309        // If caption is set, use it as body, and filename as the file name; otherwise,
310        // body is the filename, and the filename is not set.
311        // https://github.com/matrix-org/matrix-spec-proposals/blob/main/proposals/2530-body-as-caption.md
312        let (body, formatted, filename) = match $caption {
313            Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
314            None => ($filename, None, None),
315        };
316
317        let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
318
319        match $content_type.type_() {
320            mime::IMAGE => {
321                let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
322                    mimetype: Some($content_type.as_ref().to_owned()),
323                    thumbnail_source,
324                    thumbnail_info
325                });
326                let content = assign!(ImageMessageEventContent::new(body, $source), {
327                    info: Some(Box::new(info)),
328                    formatted,
329                    filename
330                });
331                <$t>::Image(content)
332            }
333
334            mime::AUDIO => {
335                let mut content = assign!(AudioMessageEventContent::new(body, $source), {
336                    formatted,
337                    filename
338                });
339
340                if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
341                 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
342                    let waveform = waveform_vec
343                        .iter()
344                        .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
345                        .map(Into::into)
346                        .collect();
347                    content.audio =
348                        Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
349                }
350
351                if matches!($info, Some(AttachmentInfo::Voice(_))) {
352                    content.voice = Some(UnstableVoiceContentBlock::new());
353                }
354
355                let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
356                audio_info.mimetype = Some($content_type.as_ref().to_owned());
357                let content = content.info(Box::new(audio_info));
358
359                <$t>::Audio(content)
360            }
361
362            mime::VIDEO => {
363                let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
364                    mimetype: Some($content_type.as_ref().to_owned()),
365                    thumbnail_source,
366                    thumbnail_info
367                });
368                let content = assign!(VideoMessageEventContent::new(body, $source), {
369                    info: Some(Box::new(info)),
370                    formatted,
371                    filename
372                });
373                <$t>::Video(content)
374            }
375
376            _ => {
377                let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
378                    mimetype: Some($content_type.as_ref().to_owned()),
379                    thumbnail_source,
380                    thumbnail_info
381                });
382                let content = assign!(FileMessageEventContent::new(body, $source), {
383                    info: Some(Box::new(info)),
384                    formatted,
385                    filename,
386                });
387                <$t>::File(content)
388            }
389        }
390    }};
391}
392
393impl Room {
394    /// Create a new `Room`
395    ///
396    /// # Arguments
397    /// * `client` - The client used to make requests.
398    ///
399    /// * `room` - The underlying room.
400    pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
401        Self { inner: room, client }
402    }
403
404    /// Leave this room.
405    /// If the room was in [`RoomState::Invited`] state, it'll also be forgotten
406    /// automatically.
407    ///
408    /// Only invited and joined rooms can be left.
409    #[doc(alias = "reject_invitation")]
410    #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
411    async fn leave_impl(&self) -> (Result<()>, &Room) {
412        let state = self.state();
413        if state == RoomState::Left {
414            return (
415                Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
416                    "Joined or Invited",
417                    state,
418                )))),
419                self,
420            );
421        }
422
423        // If the room was in Invited state we should also forget it when declining the
424        // invite.
425        let should_forget = matches!(self.state(), RoomState::Invited);
426
427        let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
428        let response = self.client.send(request).await;
429
430        // The server can return with an error that is acceptable to ignore. Let's find
431        // which one.
432        if let Err(error) = response {
433            #[allow(clippy::collapsible_match)]
434            let ignore_error = if let Some(error) = error.client_api_error_kind() {
435                match error {
436                    // The user is trying to leave a room but doesn't have permissions to do so.
437                    // Let's consider the user has left the room.
438                    ErrorKind::Forbidden => true,
439                    _ => false,
440                }
441            } else {
442                false
443            };
444
445            error!(?error, ignore_error, should_forget, "Failed to leave the room");
446
447            if !ignore_error {
448                return (Err(error.into()), self);
449            }
450        }
451
452        if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
453            return (Err(e.into()), self);
454        }
455
456        if should_forget {
457            trace!("Trying to forget the room");
458
459            if let Err(error) = self.forget().await {
460                error!(?error, "Failed to forget the room");
461            }
462        }
463
464        (Ok(()), self)
465    }
466
467    /// Leave this room and all predecessors.
468    /// If any room was in [`RoomState::Invited`] state, it'll also be forgotten
469    /// automatically.
470    ///
471    /// Only invited and joined rooms can be left.
472    /// Will return an error if the current room fails to leave but
473    /// will only warn if a predecessor fails to leave.
474    pub async fn leave(&self) -> Result<()> {
475        let mut rooms: Vec<Room> = vec![self.clone()];
476        let mut current_room = self;
477
478        while let Some(predecessor) = current_room.predecessor_room() {
479            let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
480
481            if let Some(predecessor_room) = maybe_predecessor_room {
482                rooms.push(predecessor_room.clone());
483                current_room = rooms.last().expect("Room just pushed so can't be empty");
484            } else {
485                warn!("Cannot find predecessor room");
486                break;
487            }
488        }
489
490        let batch_size = 5;
491
492        let rooms_futures: Vec<_> = rooms
493            .iter()
494            .filter_map(|room| match room.state() {
495                RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
496                    Some(room.leave_impl())
497                }
498                RoomState::Banned | RoomState::Left => None,
499            })
500            .collect();
501
502        let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
503
504        let mut maybe_this_room_failed_with: Option<Error> = None;
505
506        while let Some(result) = futures_stream.next().await {
507            if let (Err(e), room) = result {
508                if room.room_id() == self.room_id() {
509                    maybe_this_room_failed_with = Some(e);
510                } else {
511                    warn!("Failure while attempting to leave predecessor room: {e:?}");
512                }
513            }
514        }
515
516        maybe_this_room_failed_with.map_or(Ok(()), Err)
517    }
518
519    /// Join this room.
520    ///
521    /// Only invited and left rooms can be joined via this method.
522    #[doc(alias = "accept_invitation")]
523    pub async fn join(&self) -> Result<()> {
524        let prev_room_state = self.inner.state();
525
526        if prev_room_state == RoomState::Joined {
527            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
528                "Invited or Left",
529                prev_room_state,
530            ))));
531        }
532
533        self.client.join_room_by_id(self.room_id()).await?;
534
535        Ok(())
536    }
537
538    /// Get the inner client saved in this room instance.
539    ///
540    /// Returns the client this room is part of.
541    pub fn client(&self) -> Client {
542        self.client.clone()
543    }
544
545    /// Get the sync state of this room, i.e. whether it was fully synced with
546    /// the server.
547    pub fn is_synced(&self) -> bool {
548        self.inner.is_state_fully_synced()
549    }
550
551    /// Gets the avatar of this room, if set.
552    ///
553    /// Returns the avatar.
554    /// If a thumbnail is requested no guarantee on the size of the image is
555    /// given.
556    ///
557    /// # Arguments
558    ///
559    /// * `format` - The desired format of the avatar.
560    ///
561    /// # Examples
562    ///
563    /// ```no_run
564    /// # use matrix_sdk::Client;
565    /// # use matrix_sdk::ruma::room_id;
566    /// # use matrix_sdk::media::MediaFormat;
567    /// # use url::Url;
568    /// # let homeserver = Url::parse("http://example.com").unwrap();
569    /// # async {
570    /// # let user = "example";
571    /// let client = Client::new(homeserver).await.unwrap();
572    /// client.matrix_auth().login_username(user, "password").send().await.unwrap();
573    /// let room_id = room_id!("!roomid:example.com");
574    /// let room = client.get_room(&room_id).unwrap();
575    /// if let Some(avatar) = room.avatar(MediaFormat::File).await.unwrap() {
576    ///     std::fs::write("avatar.png", avatar);
577    /// }
578    /// # };
579    /// ```
580    pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
581        let Some(url) = self.avatar_url() else { return Ok(None) };
582        let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
583        Ok(Some(self.client.media().get_media_content(&request, true).await?))
584    }
585
586    /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
587    /// returns a `Messages` struct that contains a chunk of room and state
588    /// events (`RoomEvent` and `AnyStateEvent`).
589    ///
590    /// With the encryption feature, messages are decrypted if possible. If
591    /// decryption fails for an individual message, that message is returned
592    /// undecrypted.
593    ///
594    /// # Examples
595    ///
596    /// ```no_run
597    /// use matrix_sdk::{Client, room::MessagesOptions};
598    /// # use matrix_sdk::ruma::{
599    /// #     api::client::filter::RoomEventFilter,
600    /// #     room_id,
601    /// # };
602    /// # use url::Url;
603    ///
604    /// # let homeserver = Url::parse("http://example.com").unwrap();
605    /// # async {
606    /// let options =
607    ///     MessagesOptions::backward().from("t47429-4392820_219380_26003_2265");
608    ///
609    /// let mut client = Client::new(homeserver).await.unwrap();
610    /// let room = client.get_room(room_id!("!roomid:example.com")).unwrap();
611    /// assert!(room.messages(options).await.is_ok());
612    /// # };
613    /// ```
614    #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
615    pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
616        let room_id = self.inner.room_id();
617        let request = options.into_request(room_id);
618        let http_response = self.client.send(request).await?;
619
620        let push_ctx = self.push_context().await?;
621        let chunk = join_all(
622            http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
623        )
624        .await;
625
626        // Save the loaded events into the event cache, if it's set up.
627        if let Ok((cache, _handles)) = self.event_cache().await {
628            cache.save_events(chunk.clone()).await;
629        }
630
631        Ok(Messages {
632            start: http_response.start,
633            end: http_response.end,
634            chunk,
635            state: http_response.state,
636        })
637    }
638
639    /// Register a handler for events of a specific type, within this room.
640    ///
641    /// This method works the same way as [`Client::add_event_handler`], except
642    /// that the handler will only be called for events within this room. See
643    /// that method for more details on event handler functions.
644    ///
645    /// `room.add_event_handler(hdl)` is equivalent to
646    /// `client.add_room_event_handler(room_id, hdl)`. Use whichever one is more
647    /// convenient in your use case.
648    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
649    where
650        Ev: SyncEvent + DeserializeOwned + Send + 'static,
651        H: EventHandler<Ev, Ctx>,
652    {
653        self.client.add_room_event_handler(self.room_id(), handler)
654    }
655
656    /// Subscribe to all updates for this room.
657    ///
658    /// The returned receiver will receive a new message for each sync response
659    /// that contains updates for this room.
660    pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
661        self.client.subscribe_to_room_updates(self.room_id())
662    }
663
664    /// Subscribe to typing notifications for this room.
665    ///
666    /// The returned receiver will receive a new vector of user IDs for each
667    /// sync response that contains 'm.typing' event. The current user ID will
668    /// be filtered out.
669    pub fn subscribe_to_typing_notifications(
670        &self,
671    ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
672        let (sender, receiver) = broadcast::channel(16);
673        let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
674            let own_user_id = self.own_user_id().to_owned();
675            move |event: SyncTypingEvent| async move {
676                // Ignore typing notifications from own user.
677                let typing_user_ids = event
678                    .content
679                    .user_ids
680                    .into_iter()
681                    .filter(|user_id| *user_id != own_user_id)
682                    .collect();
683                // Ignore the result. It can only fail if there are no listeners.
684                let _ = sender.send(typing_user_ids);
685            }
686        });
687        let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
688        (drop_guard, receiver)
689    }
690
691    /// Subscribe to updates about users who are in "pin violation" i.e. their
692    /// identity has changed and the user has not yet acknowledged this.
693    ///
694    /// The returned receiver will receive a new vector of
695    /// [`IdentityStatusChange`] each time a /keys/query response shows a
696    /// changed identity for a member of this room, or a sync shows a change
697    /// to the membership of an affected user. (Changes to the current user are
698    /// not directly included, but some changes to the current user's identity
699    /// can trigger changes to how we see other users' identities, which
700    /// will be included.)
701    ///
702    /// The first item in the stream provides the current state of the room:
703    /// each member of the room who is not in "pinned" or "verified" state will
704    /// be included (except the current user).
705    ///
706    /// If the `changed_to` property of an [`IdentityStatusChange`] is set to
707    /// `PinViolation` then a warning should be displayed to the user. If it is
708    /// set to `Pinned` then no warning should be displayed.
709    ///
710    /// Note that if a user who is in pin violation leaves the room, a `Pinned`
711    /// update is sent, to indicate that the warning should be removed, even
712    /// though the user's identity is not necessarily pinned.
713    #[cfg(feature = "e2e-encryption")]
714    pub async fn subscribe_to_identity_status_changes(
715        &self,
716    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
717        IdentityStatusChanges::create_stream(self.clone()).await
718    }
719
720    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
721    /// decrypted if needs be.
722    ///
723    /// Only logs from the crypto crate will indicate a failure to decrypt.
724    #[cfg(not(feature = "experimental-encrypted-state-events"))]
725    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
726    async fn try_decrypt_event(
727        &self,
728        event: Raw<AnyTimelineEvent>,
729        push_ctx: Option<&PushContext>,
730    ) -> TimelineEvent {
731        #[cfg(feature = "e2e-encryption")]
732        if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
733            SyncMessageLikeEvent::Original(_),
734        ))) = event.deserialize_as::<AnySyncTimelineEvent>()
735            && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
736        {
737            return event;
738        }
739
740        let mut event = TimelineEvent::from_plaintext(event.cast());
741        if let Some(push_ctx) = push_ctx {
742            event.set_push_actions(push_ctx.for_event(event.raw()).await);
743        }
744
745        event
746    }
747
748    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
749    /// decrypted if needs be.
750    ///
751    /// Only logs from the crypto crate will indicate a failure to decrypt.
752    #[cfg(feature = "experimental-encrypted-state-events")]
753    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
754    async fn try_decrypt_event(
755        &self,
756        event: Raw<AnyTimelineEvent>,
757        push_ctx: Option<&PushContext>,
758    ) -> TimelineEvent {
759        // If we have either an encrypted message-like or state event, try to decrypt.
760        match event.deserialize_as::<AnySyncTimelineEvent>() {
761            Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
762                SyncMessageLikeEvent::Original(_),
763            ))) => {
764                if let Ok(event) = self
765                    .decrypt_event(
766                        event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
767                        push_ctx,
768                    )
769                    .await
770                {
771                    return event;
772                }
773            }
774            Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
775                SyncStateEvent::Original(_),
776            ))) => {
777                if let Ok(event) = self
778                    .decrypt_event(
779                        event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
780                        push_ctx,
781                    )
782                    .await
783                {
784                    return event;
785                }
786            }
787            _ => {}
788        }
789
790        let mut event = TimelineEvent::from_plaintext(event.cast());
791        if let Some(push_ctx) = push_ctx {
792            event.set_push_actions(push_ctx.for_event(event.raw()).await);
793        }
794
795        event
796    }
797
798    /// Fetch the event with the given `EventId` in this room.
799    ///
800    /// It uses the given [`RequestConfig`] if provided, or the client's default
801    /// one otherwise.
802    pub async fn event(
803        &self,
804        event_id: &EventId,
805        request_config: Option<RequestConfig>,
806    ) -> Result<TimelineEvent> {
807        let request =
808            get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
809
810        let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
811        let push_ctx = self.push_context().await?;
812        let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
813
814        // Save the event into the event cache, if it's set up.
815        if let Ok((cache, _handles)) = self.event_cache().await {
816            cache.save_events([event.clone()]).await;
817        }
818
819        Ok(event)
820    }
821
822    /// Try to load the event from the [`EventCache`][crate::event_cache], if
823    /// it's enabled, or fetch it from the homeserver.
824    ///
825    /// When running the request against the homeserver, it uses the given
826    /// [`RequestConfig`] if provided, or the client's default one
827    /// otherwise.
828    pub async fn load_or_fetch_event(
829        &self,
830        event_id: &EventId,
831        request_config: Option<RequestConfig>,
832    ) -> Result<TimelineEvent> {
833        match self.event_cache().await {
834            Ok((event_cache, _drop_handles)) => {
835                if let Some(event) = event_cache.find_event(event_id).await? {
836                    return Ok(event);
837                }
838                // Fallthrough: try with a request.
839            }
840            Err(err) => {
841                debug!("error when getting the event cache: {err}");
842            }
843        }
844        self.event(event_id, request_config).await
845    }
846
847    /// Try to load the event and its relations from the
848    /// [`EventCache`][crate::event_cache], if it's enabled, or fetch it
849    /// from the homeserver.
850    ///
851    /// You can control which types of related events are retrieved using
852    /// `filter`. A `None` value will retrieve any type of related event.
853    ///
854    /// If the event is found in the event cache, but we can't find any
855    /// relations for it there, then we will still attempt to fetch the
856    /// relations from the homeserver.
857    ///
858    /// When running any request against the homeserver, it uses the given
859    /// [`RequestConfig`] if provided, or the client's default one
860    /// otherwise.
861    ///
862    /// Returns a tuple formed of the event and a vector of its relations (that
863    /// can be empty).
864    pub async fn load_or_fetch_event_with_relations(
865        &self,
866        event_id: &EventId,
867        filter: Option<Vec<RelationType>>,
868        request_config: Option<RequestConfig>,
869    ) -> Result<(TimelineEvent, Vec<TimelineEvent>)> {
870        let fetch_relations = async || {
871            // If there's only a single filter, we can use a more efficient request,
872            // specialized on the filter type.
873            //
874            // Otherwise, we need to get all the relations:
875            // - either because no filters implies we fetch all relations,
876            // - or because there are multiple filters and we must filter out manually.
877            let include_relations = if let Some(filter) = &filter
878                && filter.len() == 1
879            {
880                IncludeRelations::RelationsOfType(filter[0].clone())
881            } else {
882                IncludeRelations::AllRelations
883            };
884
885            let mut opts = RelationsOptions {
886                include_relations,
887                recurse: true,
888                limit: Some(uint!(256)),
889                ..Default::default()
890            };
891
892            let mut events = Vec::new();
893            loop {
894                match self.relations(event_id.to_owned(), opts.clone()).await {
895                    Ok(relations) => {
896                        if let Some(filter) = filter.as_ref() {
897                            // Manually filter out the relation types we're interested in.
898                            events.extend(relations.chunk.into_iter().filter_map(|ev| {
899                                let (rel_type, _) = extract_relation(ev.raw())?;
900                                filter
901                                    .iter()
902                                    .any(|ruma_filter| ruma_filter == &rel_type)
903                                    .then_some(ev)
904                            }));
905                        } else {
906                            // No filter: include all events from the response.
907                            events.extend(relations.chunk);
908                        }
909
910                        if let Some(next_from) = relations.next_batch_token {
911                            opts.from = Some(next_from);
912                        } else {
913                            break events;
914                        }
915                    }
916
917                    Err(err) => {
918                        warn!(%event_id, "error when loading relations of pinned event from server: {err}");
919                        break events;
920                    }
921                }
922            }
923        };
924
925        // First, try to load the event *and* its relations from the event cache, all at
926        // once.
927        let event_cache = match self.event_cache().await {
928            Ok((event_cache, drop_handles)) => {
929                if let Some((event, mut relations)) =
930                    event_cache.find_event_with_relations(event_id, filter.clone()).await?
931                {
932                    if relations.is_empty() {
933                        // The event cache doesn't have any relations for this event, try to fetch
934                        // them from the server instead.
935                        relations = fetch_relations().await;
936                    }
937
938                    return Ok((event, relations));
939                }
940
941                // Otherwise, get the event from the server.
942                Some((event_cache, drop_handles))
943            }
944
945            Err(err) => {
946                debug!("error when getting the event cache: {err}");
947                // Fallthrough: try with a request.
948                None
949            }
950        };
951
952        // Fetch the event from the server. A failure here is fatal, as we must return
953        // the target event.
954        let event = self.event(event_id, request_config).await?;
955
956        // Try to get the relations from the event cache (if we have one).
957        if let Some((event_cache, _drop_handles)) = event_cache
958            && let Some(relations) =
959                event_cache.find_event_relations(event_id, filter.clone()).await.ok()
960            && !relations.is_empty()
961        {
962            return Ok((event, relations));
963        }
964
965        // We couldn't find the relations in the event cache; fetch them from the
966        // server.
967        Ok((event, fetch_relations().await))
968    }
969
970    /// Fetch the event with the given `EventId` in this room, using the
971    /// `/context` endpoint to get more information.
972    pub async fn event_with_context(
973        &self,
974        event_id: &EventId,
975        lazy_load_members: bool,
976        context_size: UInt,
977        request_config: Option<RequestConfig>,
978    ) -> Result<EventWithContextResponse> {
979        let mut request =
980            context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
981
982        request.limit = context_size;
983
984        if lazy_load_members {
985            request.filter.lazy_load_options =
986                LazyLoadOptions::Enabled { include_redundant_members: false };
987        }
988
989        let response = self.client.send(request).with_request_config(request_config).await?;
990
991        let push_ctx = self.push_context().await?;
992        let push_ctx = push_ctx.as_ref();
993        let target_event = if let Some(event) = response.event {
994            Some(self.try_decrypt_event(event, push_ctx).await)
995        } else {
996            None
997        };
998
999        // Note: the joined future will fail if any future failed, but
1000        // [`Self::try_decrypt_event`] doesn't hard-fail when there's a
1001        // decryption error, so we should prevent against most bad cases here.
1002        let (events_before, events_after) = join!(
1003            join_all(
1004                response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1005            ),
1006            join_all(
1007                response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1008            ),
1009        );
1010
1011        // Save the loaded events into the event cache, if it's set up.
1012        if let Ok((cache, _handles)) = self.event_cache().await {
1013            let mut events_to_save: Vec<TimelineEvent> = Vec::new();
1014            if let Some(event) = &target_event {
1015                events_to_save.push(event.clone());
1016            }
1017
1018            for event in &events_before {
1019                events_to_save.push(event.clone());
1020            }
1021
1022            for event in &events_after {
1023                events_to_save.push(event.clone());
1024            }
1025
1026            cache.save_events(events_to_save).await;
1027        }
1028
1029        Ok(EventWithContextResponse {
1030            event: target_event,
1031            events_before,
1032            events_after,
1033            state: response.state,
1034            prev_batch_token: response.start,
1035            next_batch_token: response.end,
1036        })
1037    }
1038
1039    pub(crate) async fn request_members(&self) -> Result<()> {
1040        self.client
1041            .locks()
1042            .members_request_deduplicated_handler
1043            .run(self.room_id().to_owned(), async move {
1044                let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
1045                let response = self
1046                    .client
1047                    .send(request.clone())
1048                    .with_request_config(
1049                        // In some cases it can take longer than 30s to load:
1050                        // https://github.com/element-hq/synapse/issues/16872
1051                        RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
1052                    )
1053                    .await?;
1054
1055                // That's a large `Future`. Let's `Box::pin` to reduce its size on the stack.
1056                Box::pin(self.client.base_client().receive_all_members(
1057                    self.room_id(),
1058                    &request,
1059                    &response,
1060                ))
1061                .await?;
1062
1063                Ok(())
1064            })
1065            .await
1066    }
1067
1068    /// Request to update the encryption state for this room.
1069    ///
1070    /// It does nothing if the encryption state is already
1071    /// [`EncryptionState::Encrypted`] or [`EncryptionState::NotEncrypted`].
1072    pub async fn request_encryption_state(&self) -> Result<()> {
1073        if !self.inner.encryption_state().is_unknown() {
1074            return Ok(());
1075        }
1076
1077        self.client
1078            .locks()
1079            .encryption_state_deduplicated_handler
1080            .run(self.room_id().to_owned(), async move {
1081                // Request the event from the server.
1082                let request = get_state_event_for_key::v3::Request::new(
1083                    self.room_id().to_owned(),
1084                    StateEventType::RoomEncryption,
1085                    "".to_owned(),
1086                );
1087                let response = match self.client.send(request).await {
1088                    Ok(response) => Some(
1089                        response
1090                            .into_content()
1091                            .deserialize_as_unchecked::<PossiblyRedactedRoomEncryptionEventContent>(
1092                            )?,
1093                    ),
1094                    Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
1095                    Err(err) => return Err(err.into()),
1096                };
1097
1098                let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
1099
1100                // Persist the event and the fact that we requested it from the server in
1101                // `RoomInfo`.
1102                let mut room_info = self.clone_info();
1103                room_info.mark_encryption_state_synced();
1104                room_info.set_encryption_event(response);
1105                let mut changes = StateChanges::default();
1106                changes.add_room(room_info.clone());
1107
1108                self.client.state_store().save_changes(&changes).await?;
1109                self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
1110
1111                Ok(())
1112            })
1113            .await
1114    }
1115
1116    /// Check the encryption state of this room.
1117    ///
1118    /// If the result is [`EncryptionState::Unknown`], one might want to call
1119    /// [`Room::request_encryption_state`].
1120    pub fn encryption_state(&self) -> EncryptionState {
1121        self.inner.encryption_state()
1122    }
1123
1124    /// Force to update the encryption state by calling
1125    /// [`Room::request_encryption_state`], and then calling
1126    /// [`Room::encryption_state`].
1127    ///
1128    /// This method is useful to ensure the encryption state is up-to-date.
1129    pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
1130        self.request_encryption_state().await?;
1131
1132        Ok(self.encryption_state())
1133    }
1134
1135    /// Gets additional context info about the client crypto.
1136    #[cfg(feature = "e2e-encryption")]
1137    pub async fn crypto_context_info(&self) -> CryptoContextInfo {
1138        let encryption = self.client.encryption();
1139
1140        let this_device_is_verified = match encryption.get_own_device().await {
1141            Ok(Some(device)) => device.is_verified_with_cross_signing(),
1142
1143            // Should not happen, there will always be an own device
1144            _ => true,
1145        };
1146
1147        let backup_exists_on_server =
1148            encryption.backups().exists_on_server().await.unwrap_or(false);
1149
1150        CryptoContextInfo {
1151            device_creation_ts: encryption.device_creation_timestamp().await,
1152            this_device_is_verified,
1153            is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1154            backup_exists_on_server,
1155        }
1156    }
1157
1158    fn are_events_visible(&self) -> bool {
1159        if let RoomState::Invited = self.inner.state() {
1160            return matches!(
1161                self.inner.history_visibility_or_default(),
1162                HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1163            );
1164        }
1165
1166        true
1167    }
1168
1169    /// Sync the member list with the server.
1170    ///
1171    /// This method will de-duplicate requests if it is called multiple times in
1172    /// quick succession, in that case the return value will be `None`. This
1173    /// method does nothing if the members are already synced.
1174    pub async fn sync_members(&self) -> Result<()> {
1175        if !self.are_events_visible() {
1176            return Ok(());
1177        }
1178
1179        if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1180    }
1181
1182    /// Get a specific member of this room.
1183    ///
1184    /// *Note*: This method will fetch the members from the homeserver if the
1185    /// member list isn't synchronized due to member lazy loading. Because of
1186    /// that it might panic if it isn't run on a tokio thread.
1187    ///
1188    /// Use [get_member_no_sync()](#method.get_member_no_sync) if you want a
1189    /// method that doesn't do any requests.
1190    ///
1191    /// # Arguments
1192    ///
1193    /// * `user_id` - The ID of the user that should be fetched out of the
1194    ///   store.
1195    pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1196        self.sync_members().await?;
1197        self.get_member_no_sync(user_id).await
1198    }
1199
1200    /// Get a specific member of this room.
1201    ///
1202    /// *Note*: This method will not fetch the members from the homeserver if
1203    /// the member list isn't synchronized due to member lazy loading. Thus,
1204    /// members could be missing.
1205    ///
1206    /// Use [get_member()](#method.get_member) if you want to ensure to always
1207    /// have the full member list to chose from.
1208    ///
1209    /// # Arguments
1210    ///
1211    /// * `user_id` - The ID of the user that should be fetched out of the
1212    ///   store.
1213    pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1214        Ok(self
1215            .inner
1216            .get_member(user_id)
1217            .await?
1218            .map(|member| RoomMember::new(self.client.clone(), member)))
1219    }
1220
1221    /// Get members for this room, with the given memberships.
1222    ///
1223    /// *Note*: This method will fetch the members from the homeserver if the
1224    /// member list isn't synchronized due to member lazy loading. Because of
1225    /// that it might panic if it isn't run on a tokio thread.
1226    ///
1227    /// Use [members_no_sync()](#method.members_no_sync) if you want a
1228    /// method that doesn't do any requests.
1229    pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1230        self.sync_members().await?;
1231        self.members_no_sync(memberships).await
1232    }
1233
1234    /// Get members for this room, with the given memberships.
1235    ///
1236    /// *Note*: This method will not fetch the members from the homeserver if
1237    /// the member list isn't synchronized due to member lazy loading. Thus,
1238    /// members could be missing.
1239    ///
1240    /// Use [members()](#method.members) if you want to ensure to always get
1241    /// the full member list.
1242    pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1243        Ok(self
1244            .inner
1245            .members(memberships)
1246            .await?
1247            .into_iter()
1248            .map(|member| RoomMember::new(self.client.clone(), member))
1249            .collect())
1250    }
1251
1252    /// Sets the display name of the current user within this room.
1253    ///
1254    /// *Note*: This is different to [`crate::Account::set_display_name`] which
1255    /// updates the user's display name across all of their rooms.
1256    pub async fn set_own_member_display_name(
1257        &self,
1258        display_name: Option<String>,
1259    ) -> Result<send_state_event::v3::Response> {
1260        let user_id = self.own_user_id();
1261        let member_event =
1262            self.get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id).await?;
1263
1264        let Some(RawSyncOrStrippedState::Sync(raw_event)) = member_event else {
1265            return Err(Error::InsufficientData);
1266        };
1267
1268        let event = raw_event.deserialize()?;
1269
1270        let mut content = match event {
1271            SyncStateEvent::Original(original_event) => original_event.content,
1272            SyncStateEvent::Redacted(redacted_event) => {
1273                RoomMemberEventContent::new(redacted_event.content.membership)
1274            }
1275        };
1276
1277        content.displayname = display_name;
1278        self.send_state_event_for_key(user_id, content).await
1279    }
1280
1281    /// Get all state events of a given type in this room.
1282    pub async fn get_state_events(
1283        &self,
1284        event_type: StateEventType,
1285    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1286        self.client
1287            .state_store()
1288            .get_state_events(self.room_id(), event_type)
1289            .await
1290            .map_err(Into::into)
1291    }
1292
1293    /// Get all state events of a given statically-known type in this room.
1294    ///
1295    /// # Examples
1296    ///
1297    /// ```no_run
1298    /// # async {
1299    /// # let room: matrix_sdk::Room = todo!();
1300    /// use matrix_sdk::ruma::{
1301    ///     events::room::member::RoomMemberEventContent, serde::Raw,
1302    /// };
1303    ///
1304    /// let room_members =
1305    ///     room.get_state_events_static::<RoomMemberEventContent>().await?;
1306    /// # anyhow::Ok(())
1307    /// # };
1308    /// ```
1309    pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1310    where
1311        C: StaticEventContent<IsPrefix = ruma::events::False>
1312            + StaticStateEventContent
1313            + RedactContent,
1314        C::Redacted: RedactedStateEventContent,
1315    {
1316        Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1317    }
1318
1319    /// Get the state events of a given type with the given state keys in this
1320    /// room.
1321    pub async fn get_state_events_for_keys(
1322        &self,
1323        event_type: StateEventType,
1324        state_keys: &[&str],
1325    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1326        self.client
1327            .state_store()
1328            .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1329            .await
1330            .map_err(Into::into)
1331    }
1332
1333    /// Get the state events of a given statically-known type with the given
1334    /// state keys in this room.
1335    ///
1336    /// # Examples
1337    ///
1338    /// ```no_run
1339    /// # async {
1340    /// # let room: matrix_sdk::Room = todo!();
1341    /// # let user_ids: &[matrix_sdk::ruma::OwnedUserId] = &[];
1342    /// use matrix_sdk::ruma::events::room::member::RoomMemberEventContent;
1343    ///
1344    /// let room_members = room
1345    ///     .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
1346    ///         user_ids,
1347    ///     )
1348    ///     .await?;
1349    /// # anyhow::Ok(())
1350    /// # };
1351    /// ```
1352    pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1353        &self,
1354        state_keys: I,
1355    ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1356    where
1357        C: StaticEventContent<IsPrefix = ruma::events::False>
1358            + StaticStateEventContent
1359            + RedactContent,
1360        C::StateKey: Borrow<K>,
1361        C::Redacted: RedactedStateEventContent,
1362        K: AsRef<str> + Sized + Sync + 'a,
1363        I: IntoIterator<Item = &'a K> + Send,
1364        I::IntoIter: Send,
1365    {
1366        Ok(self
1367            .client
1368            .state_store()
1369            .get_state_events_for_keys_static(self.room_id(), state_keys)
1370            .await?)
1371    }
1372
1373    /// Get a specific state event in this room.
1374    pub async fn get_state_event(
1375        &self,
1376        event_type: StateEventType,
1377        state_key: &str,
1378    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1379        self.client
1380            .state_store()
1381            .get_state_event(self.room_id(), event_type, state_key)
1382            .await
1383            .map_err(Into::into)
1384    }
1385
1386    /// Get a specific state event of statically-known type with an empty state
1387    /// key in this room.
1388    ///
1389    /// # Examples
1390    ///
1391    /// ```no_run
1392    /// # async {
1393    /// # let room: matrix_sdk::Room = todo!();
1394    /// use matrix_sdk::ruma::events::room::power_levels::RoomPowerLevelsEventContent;
1395    ///
1396    /// let power_levels = room
1397    ///     .get_state_event_static::<RoomPowerLevelsEventContent>()
1398    ///     .await?
1399    ///     .expect("every room has a power_levels event")
1400    ///     .deserialize()?;
1401    /// # anyhow::Ok(())
1402    /// # };
1403    /// ```
1404    pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1405    where
1406        C: StaticEventContent<IsPrefix = ruma::events::False>
1407            + StaticStateEventContent<StateKey = EmptyStateKey>
1408            + RedactContent,
1409        C::Redacted: RedactedStateEventContent,
1410    {
1411        self.get_state_event_static_for_key(&EmptyStateKey).await
1412    }
1413
1414    /// Get a specific state event of statically-known type in this room.
1415    ///
1416    /// # Examples
1417    ///
1418    /// ```no_run
1419    /// # async {
1420    /// # let room: matrix_sdk::Room = todo!();
1421    /// use matrix_sdk::ruma::{
1422    ///     events::room::member::RoomMemberEventContent, serde::Raw, user_id,
1423    /// };
1424    ///
1425    /// let member_event = room
1426    ///     .get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id!(
1427    ///         "@alice:example.org"
1428    ///     ))
1429    ///     .await?;
1430    /// # anyhow::Ok(())
1431    /// # };
1432    /// ```
1433    pub async fn get_state_event_static_for_key<C, K>(
1434        &self,
1435        state_key: &K,
1436    ) -> Result<Option<RawSyncOrStrippedState<C>>>
1437    where
1438        C: StaticEventContent<IsPrefix = ruma::events::False>
1439            + StaticStateEventContent
1440            + RedactContent,
1441        C::StateKey: Borrow<K>,
1442        C::Redacted: RedactedStateEventContent,
1443        K: AsRef<str> + ?Sized + Sync,
1444    {
1445        Ok(self
1446            .client
1447            .state_store()
1448            .get_state_event_static_for_key(self.room_id(), state_key)
1449            .await?)
1450    }
1451
1452    /// Returns the parents this room advertises as its parents.
1453    ///
1454    /// Results are in no particular order.
1455    pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1456        // Implements this algorithm:
1457        // https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships
1458
1459        // Get all m.space.parent events for this room
1460        Ok(self
1461            .get_state_events_static::<SpaceParentEventContent>()
1462            .await?
1463            .into_iter()
1464            // Extract state key (ie. the parent's id) and sender
1465            .filter_map(|parent_event| match parent_event.deserialize() {
1466                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1467                    Some((e.state_key.to_owned(), e.sender))
1468                }
1469                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1470                Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1471                Err(e) => {
1472                    info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1473                    None
1474                }
1475            })
1476            // Check whether the parent recognizes this room as its child
1477            .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1478                let Some(parent_room) = self.client.get_room(&state_key) else {
1479                    // We are not in the room, cannot check if the relationship is reciprocal
1480                    // TODO: try peeking into the room
1481                    return Ok(ParentSpace::Unverifiable(state_key));
1482                };
1483                // Get the m.space.child state of the parent with this room's id
1484                // as state key.
1485                if let Some(child_event) = parent_room
1486                    .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1487                    .await?
1488                {
1489                    match child_event.deserialize() {
1490                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1491                            // There is a valid m.space.child in the parent pointing to
1492                            // this room
1493                            return Ok(ParentSpace::Reciprocal(parent_room));
1494                        }
1495                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1496                        Ok(SyncOrStrippedState::Stripped(_)) => {}
1497                        Err(e) => {
1498                            info!(
1499                                room_id = ?self.room_id(), parent_room_id = ?state_key,
1500                                "Could not deserialize m.space.child: {e}"
1501                            );
1502                        }
1503                    }
1504                    // Otherwise the event is either invalid or redacted. If
1505                    // redacted it would be missing the
1506                    // `via` key, thereby invalidating that end of the
1507                    // relationship: https://spec.matrix.org/v1.8/client-server-api/#mspacechild
1508                }
1509
1510                // No reciprocal m.space.child found, let's check if the sender has the
1511                // power to set it
1512                let Some(member) = parent_room.get_member(&sender).await? else {
1513                    // Sender is not even in the parent room
1514                    return Ok(ParentSpace::Illegitimate(parent_room));
1515                };
1516
1517                if member.can_send_state(StateEventType::SpaceChild) {
1518                    // Sender does have the power to set m.room.child
1519                    Ok(ParentSpace::WithPowerlevel(parent_room))
1520                } else {
1521                    Ok(ParentSpace::Illegitimate(parent_room))
1522                }
1523            })
1524            .collect::<FuturesUnordered<_>>())
1525    }
1526
1527    /// Read account data in this room, from storage.
1528    pub async fn account_data(
1529        &self,
1530        data_type: RoomAccountDataEventType,
1531    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1532        self.client
1533            .state_store()
1534            .get_room_account_data_event(self.room_id(), data_type)
1535            .await
1536            .map_err(Into::into)
1537    }
1538
1539    /// Get account data of a statically-known type in this room, from storage.
1540    ///
1541    /// # Examples
1542    ///
1543    /// ```no_run
1544    /// # async {
1545    /// # let room: matrix_sdk::Room = todo!();
1546    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1547    ///
1548    /// match room.account_data_static::<FullyReadEventContent>().await? {
1549    ///     Some(fully_read) => {
1550    ///         println!("Found read marker: {:?}", fully_read.deserialize()?)
1551    ///     }
1552    ///     None => println!("No read marker for this room"),
1553    /// }
1554    /// # anyhow::Ok(())
1555    /// # };
1556    /// ```
1557    pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1558    where
1559        C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1560    {
1561        Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1562    }
1563
1564    /// Check if all members of this room are verified and all their devices are
1565    /// verified.
1566    ///
1567    /// Returns true if all devices in the room are verified, otherwise false.
1568    #[cfg(feature = "e2e-encryption")]
1569    pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1570        let user_ids = self
1571            .client
1572            .state_store()
1573            .get_user_ids(self.room_id(), RoomMemberships::empty())
1574            .await?;
1575
1576        for user_id in user_ids {
1577            let devices = self.client.encryption().get_user_devices(&user_id).await?;
1578            let any_unverified = devices.devices().any(|d| !d.is_verified());
1579
1580            if any_unverified {
1581                return Ok(false);
1582            }
1583        }
1584
1585        Ok(true)
1586    }
1587
1588    /// Set the given account data event for this room.
1589    ///
1590    /// # Example
1591    /// ```
1592    /// # async {
1593    /// # let room: matrix_sdk::Room = todo!();
1594    /// # let event_id: ruma::OwnedEventId = todo!();
1595    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1596    /// let content = FullyReadEventContent::new(event_id);
1597    ///
1598    /// room.set_account_data(content).await?;
1599    /// # anyhow::Ok(())
1600    /// # };
1601    /// ```
1602    pub async fn set_account_data<T>(
1603        &self,
1604        content: T,
1605    ) -> Result<set_room_account_data::v3::Response>
1606    where
1607        T: RoomAccountDataEventContent,
1608    {
1609        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1610
1611        let request = set_room_account_data::v3::Request::new(
1612            own_user.to_owned(),
1613            self.room_id().to_owned(),
1614            &content,
1615        )?;
1616
1617        Ok(self.client.send(request).await?)
1618    }
1619
1620    /// Set the given raw account data event in this room.
1621    ///
1622    /// # Example
1623    /// ```
1624    /// # async {
1625    /// # let room: matrix_sdk::Room = todo!();
1626    /// use matrix_sdk::ruma::{
1627    ///     events::{
1628    ///         AnyRoomAccountDataEventContent, RoomAccountDataEventContent,
1629    ///         marked_unread::MarkedUnreadEventContent,
1630    ///     },
1631    ///     serde::Raw,
1632    /// };
1633    /// let marked_unread_content = MarkedUnreadEventContent::new(true);
1634    /// let full_event: AnyRoomAccountDataEventContent =
1635    ///     marked_unread_content.clone().into();
1636    /// room.set_account_data_raw(
1637    ///     marked_unread_content.event_type(),
1638    ///     Raw::new(&full_event).unwrap(),
1639    /// )
1640    /// .await?;
1641    /// # anyhow::Ok(())
1642    /// # };
1643    /// ```
1644    pub async fn set_account_data_raw(
1645        &self,
1646        event_type: RoomAccountDataEventType,
1647        content: Raw<AnyRoomAccountDataEventContent>,
1648    ) -> Result<set_room_account_data::v3::Response> {
1649        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1650
1651        let request = set_room_account_data::v3::Request::new_raw(
1652            own_user.to_owned(),
1653            self.room_id().to_owned(),
1654            event_type,
1655            content,
1656        );
1657
1658        Ok(self.client.send(request).await?)
1659    }
1660
1661    /// Adds a tag to the room, or updates it if it already exists.
1662    ///
1663    /// Returns the [`create_tag::v3::Response`] from the server.
1664    ///
1665    /// # Arguments
1666    /// * `tag` - The tag to add or update.
1667    ///
1668    /// * `tag_info` - Information about the tag, generally containing the
1669    ///   `order` parameter.
1670    ///
1671    /// # Examples
1672    ///
1673    /// ```no_run
1674    /// # use std::str::FromStr;
1675    /// # use ruma::events::tag::{TagInfo, TagName, UserTagName};
1676    /// # async {
1677    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
1678    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
1679    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
1680    /// use matrix_sdk::ruma::events::tag::TagInfo;
1681    ///
1682    /// if let Some(room) = client.get_room(&room_id) {
1683    ///     let mut tag_info = TagInfo::new();
1684    ///     tag_info.order = Some(0.9);
1685    ///     let user_tag = UserTagName::from_str("u.work")?;
1686    ///
1687    ///     room.set_tag(TagName::User(user_tag), tag_info).await?;
1688    /// }
1689    /// # anyhow::Ok(()) };
1690    /// ```
1691    pub async fn set_tag(
1692        &self,
1693        tag: TagName,
1694        tag_info: TagInfo,
1695    ) -> Result<create_tag::v3::Response> {
1696        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1697        let request = create_tag::v3::Request::new(
1698            user_id.to_owned(),
1699            self.inner.room_id().to_owned(),
1700            tag.to_string(),
1701            tag_info,
1702        );
1703        Ok(self.client.send(request).await?)
1704    }
1705
1706    /// Removes a tag from the room.
1707    ///
1708    /// Returns the [`delete_tag::v3::Response`] from the server.
1709    ///
1710    /// # Arguments
1711    /// * `tag` - The tag to remove.
1712    pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1713        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1714        let request = delete_tag::v3::Request::new(
1715            user_id.to_owned(),
1716            self.inner.room_id().to_owned(),
1717            tag.to_string(),
1718        );
1719        Ok(self.client.send(request).await?)
1720    }
1721
1722    /// Add or remove the `m.favourite` flag for this room.
1723    ///
1724    /// If `is_favourite` is `true`, and the `m.low_priority` tag is set on the
1725    /// room, the tag will be removed too.
1726    ///
1727    /// # Arguments
1728    ///
1729    /// * `is_favourite` - Whether to mark this room as favourite.
1730    /// * `tag_order` - The order of the tag if any.
1731    pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1732        if is_favourite {
1733            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1734
1735            self.set_tag(TagName::Favorite, tag_info).await?;
1736
1737            if self.is_low_priority() {
1738                self.remove_tag(TagName::LowPriority).await?;
1739            }
1740        } else {
1741            self.remove_tag(TagName::Favorite).await?;
1742        }
1743        Ok(())
1744    }
1745
1746    /// Add or remove the `m.lowpriority` flag for this room.
1747    ///
1748    /// If `is_low_priority` is `true`, and the `m.favourite` tag is set on the
1749    /// room, the tag will be removed too.
1750    ///
1751    /// # Arguments
1752    ///
1753    /// * `is_low_priority` - Whether to mark this room as low_priority or not.
1754    /// * `tag_order` - The order of the tag if any.
1755    pub async fn set_is_low_priority(
1756        &self,
1757        is_low_priority: bool,
1758        tag_order: Option<f64>,
1759    ) -> Result<()> {
1760        if is_low_priority {
1761            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1762
1763            self.set_tag(TagName::LowPriority, tag_info).await?;
1764
1765            if self.is_favourite() {
1766                self.remove_tag(TagName::Favorite).await?;
1767            }
1768        } else {
1769            self.remove_tag(TagName::LowPriority).await?;
1770        }
1771        Ok(())
1772    }
1773
1774    /// Sets whether this room is a DM.
1775    ///
1776    /// When setting this room as DM, it will be marked as DM for all active
1777    /// members of the room. When unsetting this room as DM, it will be
1778    /// unmarked as DM for all users, not just the members.
1779    ///
1780    /// # Arguments
1781    /// * `is_direct` - Whether to mark this room as direct.
1782    pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1783        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1784
1785        let mut content = self
1786            .client
1787            .account()
1788            .account_data::<DirectEventContent>()
1789            .await?
1790            .map(|c| c.deserialize())
1791            .transpose()?
1792            .unwrap_or_default();
1793
1794        let this_room_id = self.inner.room_id();
1795
1796        if is_direct {
1797            let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1798            room_members.retain(|member| member.user_id() != self.own_user_id());
1799
1800            for member in room_members {
1801                let entry = content.entry(member.user_id().into()).or_default();
1802                if !entry.iter().any(|room_id| room_id == this_room_id) {
1803                    entry.push(this_room_id.to_owned());
1804                }
1805            }
1806        } else {
1807            for (_, list) in content.iter_mut() {
1808                list.retain(|room_id| *room_id != this_room_id);
1809            }
1810
1811            // Remove user ids that don't have any room marked as DM
1812            content.retain(|_, list| !list.is_empty());
1813        }
1814
1815        let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1816
1817        self.client.send(request).await?;
1818        Ok(())
1819    }
1820
1821    /// Tries to decrypt a room event.
1822    ///
1823    /// # Arguments
1824    /// * `event` - The room event to be decrypted.
1825    ///
1826    /// Returns the decrypted event. In the case of a decryption error, returns
1827    /// a `TimelineEvent` representing the decryption error.
1828    #[cfg(feature = "e2e-encryption")]
1829    #[cfg(not(feature = "experimental-encrypted-state-events"))]
1830    pub async fn decrypt_event(
1831        &self,
1832        event: &Raw<OriginalSyncRoomEncryptedEvent>,
1833        push_ctx: Option<&PushContext>,
1834    ) -> Result<TimelineEvent> {
1835        let machine = self.client.olm_machine().await;
1836        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1837
1838        match machine
1839            .try_decrypt_room_event(
1840                event.cast_ref(),
1841                self.inner.room_id(),
1842                self.client.decryption_settings(),
1843            )
1844            .await?
1845        {
1846            RoomEventDecryptionResult::Decrypted(decrypted) => {
1847                let push_actions = if let Some(push_ctx) = push_ctx {
1848                    Some(push_ctx.for_event(&decrypted.event).await)
1849                } else {
1850                    None
1851                };
1852                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1853            }
1854            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1855                self.client
1856                    .encryption()
1857                    .backups()
1858                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1859                Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1860            }
1861        }
1862    }
1863
1864    /// Tries to decrypt a room event.
1865    ///
1866    /// # Arguments
1867    /// * `event` - The room event to be decrypted.
1868    ///
1869    /// Returns the decrypted event. In the case of a decryption error, returns
1870    /// a `TimelineEvent` representing the decryption error.
1871    #[cfg(feature = "experimental-encrypted-state-events")]
1872    pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1873        &self,
1874        event: &Raw<T>,
1875        push_ctx: Option<&PushContext>,
1876    ) -> Result<TimelineEvent> {
1877        let machine = self.client.olm_machine().await;
1878        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1879
1880        match machine
1881            .try_decrypt_room_event(
1882                event.cast_ref(),
1883                self.inner.room_id(),
1884                self.client.decryption_settings(),
1885            )
1886            .await?
1887        {
1888            RoomEventDecryptionResult::Decrypted(decrypted) => {
1889                let push_actions = if let Some(push_ctx) = push_ctx {
1890                    Some(push_ctx.for_event(&decrypted.event).await)
1891                } else {
1892                    None
1893                };
1894                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1895            }
1896            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1897                self.client
1898                    .encryption()
1899                    .backups()
1900                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1901                // Cast safety: Anything that can be cast to EncryptedEvent must be a timeline
1902                // event.
1903                Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1904            }
1905        }
1906    }
1907
1908    /// Fetches the [`EncryptionInfo`] for an event decrypted with the supplied
1909    /// session_id.
1910    ///
1911    /// This may be used when we receive an update for a session, and we want to
1912    /// reflect the changes in messages we have received that were encrypted
1913    /// with that session, e.g. to remove a warning shield because a device is
1914    /// now verified.
1915    ///
1916    /// # Arguments
1917    /// * `session_id` - The ID of the Megolm session to get information for.
1918    /// * `sender` - The (claimed) sender of the event where the session was
1919    ///   used.
1920    #[cfg(feature = "e2e-encryption")]
1921    pub async fn get_encryption_info(
1922        &self,
1923        session_id: &str,
1924        sender: &UserId,
1925    ) -> Option<Arc<EncryptionInfo>> {
1926        let machine = self.client.olm_machine().await;
1927        let machine = machine.as_ref()?;
1928        machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1929    }
1930
1931    /// Forces the currently active room key, which is used to encrypt messages,
1932    /// to be rotated.
1933    ///
1934    /// A new room key will be crated and shared with all the room members the
1935    /// next time a message will be sent. You don't have to call this method,
1936    /// room keys will be rotated automatically when necessary. This method is
1937    /// still useful for debugging purposes.
1938    ///
1939    /// For more info please take a look a the [`encryption`] module
1940    /// documentation.
1941    ///
1942    /// [`encryption`]: crate::encryption
1943    #[cfg(feature = "e2e-encryption")]
1944    pub async fn discard_room_key(&self) -> Result<()> {
1945        let machine = self.client.olm_machine().await;
1946        if let Some(machine) = machine.as_ref() {
1947            machine.discard_room_key(self.inner.room_id()).await?;
1948            Ok(())
1949        } else {
1950            Err(Error::NoOlmMachine)
1951        }
1952    }
1953
1954    /// Ban the user with `UserId` from this room.
1955    ///
1956    /// # Arguments
1957    ///
1958    /// * `user_id` - The user to ban with `UserId`.
1959    ///
1960    /// * `reason` - The reason for banning this user.
1961    #[instrument(skip_all)]
1962    pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1963        let request = assign!(
1964            ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1965            { reason: reason.map(ToOwned::to_owned) }
1966        );
1967        self.client.send(request).await?;
1968        Ok(())
1969    }
1970
1971    /// Unban the user with `UserId` from this room.
1972    ///
1973    /// # Arguments
1974    ///
1975    /// * `user_id` - The user to unban with `UserId`.
1976    ///
1977    /// * `reason` - The reason for unbanning this user.
1978    #[instrument(skip_all)]
1979    pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1980        let request = assign!(
1981            unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1982            { reason: reason.map(ToOwned::to_owned) }
1983        );
1984        self.client.send(request).await?;
1985        Ok(())
1986    }
1987
1988    /// Kick a user out of this room.
1989    ///
1990    /// # Arguments
1991    ///
1992    /// * `user_id` - The `UserId` of the user that should be kicked out of the
1993    ///   room.
1994    ///
1995    /// * `reason` - Optional reason why the room member is being kicked out.
1996    #[instrument(skip_all)]
1997    pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1998        let request = assign!(
1999            kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
2000            { reason: reason.map(ToOwned::to_owned) }
2001        );
2002        self.client.send(request).await?;
2003        Ok(())
2004    }
2005
2006    /// Invite the specified user by `UserId` to this room.
2007    ///
2008    /// # Arguments
2009    ///
2010    /// * `user_id` - The `UserId` of the user to invite to the room.
2011    #[instrument(skip_all)]
2012    pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
2013        #[cfg(feature = "e2e-encryption")]
2014        if self.client.inner.enable_share_history_on_invite {
2015            shared_room_history::share_room_history(self, user_id.to_owned()).await?;
2016        }
2017
2018        let recipient = InvitationRecipient::UserId(InviteUserId::new(user_id.to_owned()));
2019        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2020        self.client.send(request).await?;
2021
2022        // Force a future room members reload before sending any event to prevent UTDs
2023        // that can happen when some event is sent after a room member has been invited
2024        // but before the /sync request could fetch the membership change event.
2025        self.mark_members_missing();
2026
2027        Ok(())
2028    }
2029
2030    /// Invite the specified user by third party id to this room.
2031    ///
2032    /// # Arguments
2033    ///
2034    /// * `invite_id` - A third party id of a user to invite to the room.
2035    #[instrument(skip_all)]
2036    pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
2037        let recipient = InvitationRecipient::ThirdPartyId(invite_id);
2038        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2039        self.client.send(request).await?;
2040
2041        // Force a future room members reload before sending any event to prevent UTDs
2042        // that can happen when some event is sent after a room member has been invited
2043        // but before the /sync request could fetch the membership change event.
2044        self.mark_members_missing();
2045
2046        Ok(())
2047    }
2048
2049    /// Activate typing notice for this room.
2050    ///
2051    /// The typing notice remains active for 4s. It can be deactivate at any
2052    /// point by setting typing to `false`. If this method is called while
2053    /// the typing notice is active nothing will happen. This method can be
2054    /// called on every key stroke, since it will do nothing while typing is
2055    /// active.
2056    ///
2057    /// # Arguments
2058    ///
2059    /// * `typing` - Whether the user is typing or has stopped typing.
2060    ///
2061    /// # Examples
2062    ///
2063    /// ```no_run
2064    /// use std::time::Duration;
2065    ///
2066    /// use matrix_sdk::ruma::api::client::typing::create_typing_event::v3::Typing;
2067    /// # use matrix_sdk::{
2068    /// #     Client, config::SyncSettings,
2069    /// #     ruma::room_id,
2070    /// # };
2071    /// # use url::Url;
2072    ///
2073    /// # async {
2074    /// # let homeserver = Url::parse("http://localhost:8080")?;
2075    /// # let client = Client::new(homeserver).await?;
2076    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2077    ///
2078    /// if let Some(room) = client.get_room(&room_id) {
2079    ///     room.typing_notice(true).await?
2080    /// }
2081    /// # anyhow::Ok(()) };
2082    /// ```
2083    pub async fn typing_notice(&self, typing: bool) -> Result<()> {
2084        self.ensure_room_joined()?;
2085
2086        // Only send a request to the homeserver if the old timeout has elapsed
2087        // or the typing notice changed state within the `TYPING_NOTICE_TIMEOUT`
2088        let send = if let Some(typing_time) =
2089            self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
2090        {
2091            if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
2092                // We always reactivate the typing notice if typing is true or
2093                // we may need to deactivate it if it's
2094                // currently active if typing is false
2095                typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
2096            } else {
2097                // Only send a request when we need to deactivate typing
2098                !typing
2099            }
2100        } else {
2101            // Typing notice is currently deactivated, therefore, send a request
2102            // only when it's about to be activated
2103            typing
2104        };
2105
2106        if send {
2107            self.send_typing_notice(typing).await?;
2108        }
2109
2110        Ok(())
2111    }
2112
2113    #[instrument(name = "typing_notice", skip(self))]
2114    async fn send_typing_notice(&self, typing: bool) -> Result<()> {
2115        let typing = if typing {
2116            self.client
2117                .inner
2118                .typing_notice_times
2119                .write()
2120                .unwrap()
2121                .insert(self.room_id().to_owned(), Instant::now());
2122            Typing::Yes(TypingInfo::new(TYPING_NOTICE_TIMEOUT))
2123        } else {
2124            self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
2125            Typing::No
2126        };
2127
2128        let request = create_typing_event::v3::Request::new(
2129            self.own_user_id().to_owned(),
2130            self.room_id().to_owned(),
2131            typing,
2132        );
2133
2134        self.client.send(request).await?;
2135
2136        Ok(())
2137    }
2138
2139    /// Send a request to set a single receipt.
2140    ///
2141    /// If an unthreaded receipt is sent, this will also unset the unread flag
2142    /// of the room if necessary.
2143    ///
2144    /// # Arguments
2145    ///
2146    /// * `receipt_type` - The type of the receipt to set. Note that it is
2147    ///   possible to set the fully-read marker although it is technically not a
2148    ///   receipt.
2149    ///
2150    /// * `thread` - The thread where this receipt should apply, if any. Note
2151    ///   that this must be [`ReceiptThread::Unthreaded`] when sending a
2152    ///   [`ReceiptType::FullyRead`][create_receipt::v3::ReceiptType::FullyRead].
2153    ///
2154    /// * `event_id` - The `EventId` of the event to set the receipt on.
2155    #[instrument(skip_all)]
2156    pub async fn send_single_receipt(
2157        &self,
2158        receipt_type: create_receipt::v3::ReceiptType,
2159        thread: ReceiptThread,
2160        event_id: OwnedEventId,
2161    ) -> Result<()> {
2162        // Since the receipt type and the thread aren't Hash/Ord, flatten then as a
2163        // string key.
2164        let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
2165
2166        self.client
2167            .inner
2168            .locks
2169            .read_receipt_deduplicated_handler
2170            .run((request_key, event_id.clone()), async {
2171                // We will unset the unread flag if we send an unthreaded receipt.
2172                let is_unthreaded = thread == ReceiptThread::Unthreaded;
2173
2174                let mut request = create_receipt::v3::Request::new(
2175                    self.room_id().to_owned(),
2176                    receipt_type,
2177                    event_id,
2178                );
2179                request.thread = thread;
2180
2181                self.client.send(request).await?;
2182
2183                if is_unthreaded {
2184                    self.set_unread_flag(false).await?;
2185                }
2186
2187                Ok(())
2188            })
2189            .await
2190    }
2191
2192    /// Send a request to set multiple receipts at once.
2193    ///
2194    /// This will also unset the unread flag of the room if necessary.
2195    ///
2196    /// # Arguments
2197    ///
2198    /// * `receipts` - The `Receipts` to send.
2199    ///
2200    /// If `receipts` is empty, this is a no-op.
2201    #[instrument(skip_all)]
2202    pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2203        if receipts.is_empty() {
2204            return Ok(());
2205        }
2206
2207        let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2208        let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2209            fully_read,
2210            read_receipt: public_read_receipt,
2211            private_read_receipt,
2212        });
2213
2214        self.client.send(request).await?;
2215
2216        self.set_unread_flag(false).await?;
2217
2218        Ok(())
2219    }
2220
2221    /// Helper function to enable End-to-end encryption in this room.
2222    /// `encrypted_state_events` is not used unless the
2223    /// `experimental-encrypted-state-events` feature is enabled.
2224    #[allow(unused_variables, unused_mut)]
2225    async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2226        use ruma::{
2227            EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2228        };
2229        const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2230
2231        if !self.latest_encryption_state().await?.is_encrypted() {
2232            let mut content =
2233                RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2234            #[cfg(feature = "experimental-encrypted-state-events")]
2235            if encrypted_state_events {
2236                content = content.with_encrypted_state();
2237            }
2238            self.send_state_event(content).await?;
2239
2240            // Spin on the sync beat event, since the first sync we receive might not
2241            // include the encryption event.
2242            //
2243            // TODO do we want to return an error here if we time out? This
2244            // could be quite useful if someone wants to enable encryption and
2245            // send a message right after it's enabled.
2246            let res = timeout(
2247                async {
2248                    loop {
2249                        // Listen for sync events, then check if the encryption state is known.
2250                        self.client.inner.sync_beat.listen().await;
2251                        let _state_store_lock =
2252                            self.client.base_client().state_store_lock().lock().await;
2253
2254                        if !self.inner.encryption_state().is_unknown() {
2255                            break;
2256                        }
2257                    }
2258                },
2259                SYNC_WAIT_TIME,
2260            )
2261            .await;
2262
2263            let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2264
2265            // If encryption was enabled, return.
2266            #[cfg(not(feature = "experimental-encrypted-state-events"))]
2267            if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2268                debug!("room successfully marked as encrypted");
2269                return Ok(());
2270            }
2271
2272            // If encryption with state event encryption was enabled, return.
2273            #[cfg(feature = "experimental-encrypted-state-events")]
2274            if res.is_ok() && {
2275                if encrypted_state_events {
2276                    self.inner.encryption_state().is_state_encrypted()
2277                } else {
2278                    self.inner.encryption_state().is_encrypted()
2279                }
2280            } {
2281                debug!("room successfully marked as encrypted");
2282                return Ok(());
2283            }
2284
2285            // If after waiting for multiple syncs, we don't have the encryption state we
2286            // expect, assume the local encryption state is incorrect; this will
2287            // cause the SDK to re-request it later for confirmation, instead of
2288            // assuming it's sync'd and correct (and not encrypted).
2289            debug!("still not marked as encrypted, marking encryption state as missing");
2290
2291            let mut room_info = self.clone_info();
2292            room_info.mark_encryption_state_missing();
2293            let mut changes = StateChanges::default();
2294            changes.add_room(room_info.clone());
2295
2296            self.client.state_store().save_changes(&changes).await?;
2297            self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2298        }
2299
2300        Ok(())
2301    }
2302
2303    /// Enable End-to-end encryption in this room.
2304    ///
2305    /// This method will be a noop if encryption is already enabled, otherwise
2306    /// sends a `m.room.encryption` state event to the room. This might fail if
2307    /// you don't have the appropriate power level to enable end-to-end
2308    /// encryption.
2309    ///
2310    /// A sync needs to be received to update the local room state. This method
2311    /// will wait for a sync to be received, this might time out if no
2312    /// sync loop is running or if the server is slow.
2313    ///
2314    /// # Examples
2315    ///
2316    /// ```no_run
2317    /// # use matrix_sdk::{
2318    /// #     Client, config::SyncSettings,
2319    /// #     ruma::room_id,
2320    /// # };
2321    /// # use url::Url;
2322    /// #
2323    /// # async {
2324    /// # let homeserver = Url::parse("http://localhost:8080")?;
2325    /// # let client = Client::new(homeserver).await?;
2326    /// # let room_id = room_id!("!test:localhost");
2327    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2328    ///
2329    /// if let Some(room) = client.get_room(&room_id) {
2330    ///     room.enable_encryption().await?
2331    /// }
2332    /// # anyhow::Ok(()) };
2333    /// ```
2334    #[instrument(skip_all)]
2335    pub async fn enable_encryption(&self) -> Result<()> {
2336        self.enable_encryption_inner(false).await
2337    }
2338
2339    /// Enable End-to-end encryption in this room, opting into experimental
2340    /// state event encryption.
2341    ///
2342    /// This method will be a noop if encryption is already enabled, otherwise
2343    /// sends a `m.room.encryption` state event to the room. This might fail if
2344    /// you don't have the appropriate power level to enable end-to-end
2345    /// encryption.
2346    ///
2347    /// A sync needs to be received to update the local room state. This method
2348    /// will wait for a sync to be received, this might time out if no
2349    /// sync loop is running or if the server is slow.
2350    ///
2351    /// # Examples
2352    ///
2353    /// ```no_run
2354    /// # use matrix_sdk::{
2355    /// #     Client, config::SyncSettings,
2356    /// #     ruma::room_id,
2357    /// # };
2358    /// # use url::Url;
2359    /// #
2360    /// # async {
2361    /// # let homeserver = Url::parse("http://localhost:8080")?;
2362    /// # let client = Client::new(homeserver).await?;
2363    /// # let room_id = room_id!("!test:localhost");
2364    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2365    ///
2366    /// if let Some(room) = client.get_room(&room_id) {
2367    ///     room.enable_encryption_with_state_event_encryption().await?
2368    /// }
2369    /// # anyhow::Ok(()) };
2370    /// ```
2371    #[instrument(skip_all)]
2372    #[cfg(feature = "experimental-encrypted-state-events")]
2373    pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2374        self.enable_encryption_inner(true).await
2375    }
2376
2377    /// Share a room key with users in the given room.
2378    ///
2379    /// This will create Olm sessions with all the users/device pairs in the
2380    /// room if necessary and share a room key that can be shared with them.
2381    ///
2382    /// Does nothing if no room key needs to be shared.
2383    // TODO: expose this publicly so people can pre-share a group session if
2384    // e.g. a user starts to type a message for a room.
2385    #[cfg(feature = "e2e-encryption")]
2386    #[instrument(skip_all, fields(room_id = ?self.room_id()))]
2387    async fn preshare_room_key(&self) -> Result<()> {
2388        self.ensure_room_joined()?;
2389
2390        // Take and release the lock on the store, if needs be.
2391        let _guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2392
2393        self.client
2394            .locks()
2395            .group_session_deduplicated_handler
2396            .run(self.room_id().to_owned(), async move {
2397                {
2398                    let members = self
2399                        .client
2400                        .state_store()
2401                        .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2402                        .await?;
2403                    self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2404                };
2405
2406                let response = self.share_room_key().await;
2407
2408                // If one of the responses failed invalidate the group
2409                // session as using it would end up in undecryptable
2410                // messages.
2411                if let Err(r) = response {
2412                    let machine = self.client.olm_machine().await;
2413                    if let Some(machine) = machine.as_ref() {
2414                        machine.discard_room_key(self.room_id()).await?;
2415                    }
2416                    return Err(r);
2417                }
2418
2419                Ok(())
2420            })
2421            .await
2422    }
2423
2424    /// Share a group session for a room.
2425    ///
2426    /// # Panics
2427    ///
2428    /// Panics if the client isn't logged in.
2429    #[cfg(feature = "e2e-encryption")]
2430    #[instrument(skip_all)]
2431    async fn share_room_key(&self) -> Result<()> {
2432        self.ensure_room_joined()?;
2433
2434        let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2435
2436        for request in requests {
2437            let response = self.client.send_to_device(&request).await?;
2438            self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2439        }
2440
2441        Ok(())
2442    }
2443
2444    /// Wait for the room to be fully synced.
2445    ///
2446    /// This method makes sure the room that was returned when joining a room
2447    /// has been echoed back in the sync.
2448    ///
2449    /// Warning: This waits until a sync happens and does not return if no sync
2450    /// is happening. It can also return early when the room is not a joined
2451    /// room anymore.
2452    #[instrument(skip_all)]
2453    pub async fn sync_up(&self) {
2454        while !self.is_synced() && self.state() == RoomState::Joined {
2455            let wait_for_beat = self.client.inner.sync_beat.listen();
2456            // We don't care whether it's a timeout or a sync beat.
2457            let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2458        }
2459    }
2460
2461    /// Send a message-like event to this room.
2462    ///
2463    /// Returns the parsed response from the server.
2464    ///
2465    /// If the encryption feature is enabled this method will transparently
2466    /// encrypt the event if this room is encrypted (except for `m.reaction`
2467    /// events, which are never encrypted).
2468    ///
2469    /// **Note**: If you just want to send an event with custom JSON content to
2470    /// a room, you can use the [`send_raw()`][Self::send_raw] method for that.
2471    ///
2472    /// If you want to set a transaction ID for the event, use
2473    /// [`.with_transaction_id()`][SendMessageLikeEvent::with_transaction_id]
2474    /// on the returned value before `.await`ing it.
2475    ///
2476    /// # Arguments
2477    ///
2478    /// * `content` - The content of the message event.
2479    ///
2480    /// # Examples
2481    ///
2482    /// ```no_run
2483    /// # use std::sync::{Arc, RwLock};
2484    /// # use matrix_sdk::{Client, config::SyncSettings};
2485    /// # use url::Url;
2486    /// # use matrix_sdk::ruma::room_id;
2487    /// # use serde::{Deserialize, Serialize};
2488    /// use matrix_sdk::ruma::{
2489    ///     MilliSecondsSinceUnixEpoch, TransactionId,
2490    ///     events::{
2491    ///         macros::EventContent,
2492    ///         room::message::{RoomMessageEventContent, TextMessageEventContent},
2493    ///     },
2494    ///     uint,
2495    /// };
2496    ///
2497    /// # async {
2498    /// # let homeserver = Url::parse("http://localhost:8080")?;
2499    /// # let mut client = Client::new(homeserver).await?;
2500    /// # let room_id = room_id!("!test:localhost");
2501    /// let content = RoomMessageEventContent::text_plain("Hello world");
2502    /// let txn_id = TransactionId::new();
2503    ///
2504    /// if let Some(room) = client.get_room(&room_id) {
2505    ///     room.send(content).with_transaction_id(txn_id).await?;
2506    /// }
2507    ///
2508    /// // Custom events work too:
2509    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2510    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
2511    /// struct TokenEventContent {
2512    ///     token: String,
2513    ///     #[serde(rename = "exp")]
2514    ///     expires_at: MilliSecondsSinceUnixEpoch,
2515    /// }
2516    ///
2517    /// # fn generate_token() -> String { todo!() }
2518    /// let content = TokenEventContent {
2519    ///     token: generate_token(),
2520    ///     expires_at: {
2521    ///         let now = MilliSecondsSinceUnixEpoch::now();
2522    ///         MilliSecondsSinceUnixEpoch(now.0 + uint!(30_000))
2523    ///     },
2524    /// };
2525    ///
2526    /// if let Some(room) = client.get_room(&room_id) {
2527    ///     room.send(content).await?;
2528    /// }
2529    /// # anyhow::Ok(()) };
2530    /// ```
2531    pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2532        SendMessageLikeEvent::new(self, content)
2533    }
2534
2535    /// Run /keys/query requests for all the non-tracked users, and for users
2536    /// with an out-of-date device list.
2537    #[cfg(feature = "e2e-encryption")]
2538    async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2539        let olm = self.client.olm_machine().await;
2540        let olm = olm.as_ref().expect("Olm machine wasn't started");
2541
2542        let members =
2543            self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2544
2545        let tracked: HashMap<_, _> = olm
2546            .store()
2547            .load_tracked_users()
2548            .await?
2549            .into_iter()
2550            .map(|tracked| (tracked.user_id, tracked.dirty))
2551            .collect();
2552
2553        // A member has no unknown devices iff it was tracked *and* the tracking is
2554        // not considered dirty.
2555        let members_with_unknown_devices =
2556            members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2557
2558        let (req_id, request) =
2559            olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2560
2561        if !request.device_keys.is_empty() {
2562            self.client.keys_query(&req_id, request.device_keys).await?;
2563        }
2564
2565        Ok(())
2566    }
2567
2568    /// Send a message-like event with custom JSON content to this room.
2569    ///
2570    /// Returns the parsed response from the server.
2571    ///
2572    /// If the encryption feature is enabled this method will transparently
2573    /// encrypt the event if this room is encrypted (except for `m.reaction`
2574    /// events, which are never encrypted).
2575    ///
2576    /// This method is equivalent to the [`send()`][Self::send] method but
2577    /// allows sending custom JSON payloads, e.g. constructed using the
2578    /// [`serde_json::json!()`] macro.
2579    ///
2580    /// If you want to set a transaction ID for the event, use
2581    /// [`.with_transaction_id()`][SendRawMessageLikeEvent::with_transaction_id]
2582    /// on the returned value before `.await`ing it.
2583    ///
2584    /// # Arguments
2585    ///
2586    /// * `event_type` - The type of the event.
2587    ///
2588    /// * `content` - The content of the event as a raw JSON value. The argument
2589    ///   type can be `serde_json::Value`, but also other raw JSON types; for
2590    ///   the full list check the documentation of
2591    ///   [`IntoRawMessageLikeEventContent`].
2592    ///
2593    /// # Examples
2594    ///
2595    /// ```no_run
2596    /// # use std::sync::{Arc, RwLock};
2597    /// # use matrix_sdk::{Client, config::SyncSettings};
2598    /// # use url::Url;
2599    /// # use matrix_sdk::ruma::room_id;
2600    /// # async {
2601    /// # let homeserver = Url::parse("http://localhost:8080")?;
2602    /// # let mut client = Client::new(homeserver).await?;
2603    /// # let room_id = room_id!("!test:localhost");
2604    /// use serde_json::json;
2605    ///
2606    /// if let Some(room) = client.get_room(&room_id) {
2607    ///     room.send_raw("m.room.message", json!({ "body": "Hello world" })).await?;
2608    /// }
2609    /// # anyhow::Ok(()) };
2610    /// ```
2611    #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2612    pub fn send_raw<'a>(
2613        &'a self,
2614        event_type: &'a str,
2615        content: impl IntoRawMessageLikeEventContent,
2616    ) -> SendRawMessageLikeEvent<'a> {
2617        // Note: the recorded instrument fields are saved in
2618        // `SendRawMessageLikeEvent::into_future`.
2619        SendRawMessageLikeEvent::new(self, event_type, content)
2620    }
2621
2622    /// Send an attachment to this room.
2623    ///
2624    /// This will upload the given data that the reader produces using the
2625    /// [`upload()`] method and post an event to the given room.
2626    /// If the room is encrypted and the encryption feature is enabled the
2627    /// upload will be encrypted.
2628    ///
2629    /// This is a convenience method that calls the
2630    /// [`upload()`] and afterwards the [`send()`].
2631    ///
2632    /// # Arguments
2633    /// * `filename` - The file name.
2634    ///
2635    /// * `content_type` - The type of the media, this will be used as the
2636    /// content-type header.
2637    ///
2638    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2639    /// media.
2640    ///
2641    /// * `config` - Metadata and configuration for the attachment.
2642    ///
2643    /// # Examples
2644    ///
2645    /// ```no_run
2646    /// # use std::fs;
2647    /// # use matrix_sdk::{Client, ruma::room_id, attachment::AttachmentConfig};
2648    /// # use url::Url;
2649    /// # use mime;
2650    /// # async {
2651    /// # let homeserver = Url::parse("http://localhost:8080")?;
2652    /// # let mut client = Client::new(homeserver).await?;
2653    /// # let room_id = room_id!("!test:localhost");
2654    /// let mut image = fs::read("/home/example/my-cat.jpg")?;
2655    ///
2656    /// if let Some(room) = client.get_room(&room_id) {
2657    ///     room.send_attachment(
2658    ///         "my_favorite_cat.jpg",
2659    ///         &mime::IMAGE_JPEG,
2660    ///         image,
2661    ///         AttachmentConfig::new(),
2662    ///     ).await?;
2663    /// }
2664    /// # anyhow::Ok(()) };
2665    /// ```
2666    ///
2667    /// [`upload()`]: crate::Media::upload
2668    /// [`send()`]: Self::send
2669    #[instrument(skip_all)]
2670    pub fn send_attachment<'a>(
2671        &'a self,
2672        filename: impl Into<String>,
2673        content_type: &'a Mime,
2674        data: Vec<u8>,
2675        config: AttachmentConfig,
2676    ) -> SendAttachment<'a> {
2677        SendAttachment::new(self, filename.into(), content_type, data, config)
2678    }
2679
2680    /// Prepare and send an attachment to this room.
2681    ///
2682    /// This will upload the given data that the reader produces using the
2683    /// [`upload()`](#method.upload) method and post an event to the given room.
2684    /// If the room is encrypted and the encryption feature is enabled the
2685    /// upload will be encrypted.
2686    ///
2687    /// This is a convenience method that calls the
2688    /// [`Client::upload()`](#Client::method.upload) and afterwards the
2689    /// [`send()`](#method.send).
2690    ///
2691    /// # Arguments
2692    /// * `filename` - The file name.
2693    ///
2694    /// * `content_type` - The type of the media, this will be used as the
2695    ///   content-type header.
2696    ///
2697    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2698    ///   media.
2699    ///
2700    /// * `config` - Metadata and configuration for the attachment.
2701    ///
2702    /// * `send_progress` - An observable to transmit forward progress about the
2703    ///   upload.
2704    ///
2705    /// * `store_in_cache` - A boolean defining whether the uploaded media will
2706    ///   be stored in the cache immediately after a successful upload.
2707    #[instrument(skip_all)]
2708    pub(super) async fn prepare_and_send_attachment<'a>(
2709        &'a self,
2710        filename: String,
2711        content_type: &'a Mime,
2712        data: Vec<u8>,
2713        mut config: AttachmentConfig,
2714        send_progress: SharedObservable<TransmissionProgress>,
2715        store_in_cache: bool,
2716    ) -> Result<send_message_event::v3::Response> {
2717        self.ensure_room_joined()?;
2718
2719        let txn_id = config.txn_id.take();
2720        let mentions = config.mentions.take();
2721
2722        let thumbnail = config.thumbnail.take();
2723
2724        // If necessary, store caching data for the thumbnail ahead of time.
2725        let thumbnail_cache_info = if store_in_cache {
2726            thumbnail
2727                .as_ref()
2728                .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2729        } else {
2730            None
2731        };
2732
2733        #[cfg(feature = "e2e-encryption")]
2734        let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2735            self.client
2736                .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2737                .await?
2738        } else {
2739            self.client
2740                .media()
2741                .upload_plain_media_and_thumbnail(
2742                    content_type,
2743                    // TODO: get rid of this clone; wait for Ruma to use `Bytes` or something
2744                    // similar.
2745                    data.clone(),
2746                    thumbnail,
2747                    send_progress,
2748                )
2749                .await?
2750        };
2751
2752        #[cfg(not(feature = "e2e-encryption"))]
2753        let (media_source, thumbnail) = self
2754            .client
2755            .media()
2756            .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2757            .await?;
2758
2759        if store_in_cache {
2760            let media_store_lock_guard = self.client.media_store().lock().await?;
2761
2762            // A failure to cache shouldn't prevent the whole upload from finishing
2763            // properly, so only log errors during caching.
2764
2765            debug!("caching the media");
2766            let request =
2767                MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2768
2769            if let Err(err) = media_store_lock_guard
2770                .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2771                .await
2772            {
2773                warn!("unable to cache the media after uploading it: {err}");
2774            }
2775
2776            if let Some(((data, height, width), source)) =
2777                thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2778            {
2779                debug!("caching the thumbnail");
2780
2781                let request = MediaRequestParameters {
2782                    source: source.clone(),
2783                    format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2784                };
2785
2786                if let Err(err) = media_store_lock_guard
2787                    .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2788                    .await
2789                {
2790                    warn!("unable to cache the media after uploading it: {err}");
2791                }
2792            }
2793        }
2794
2795        let content = self
2796            .make_media_event(
2797                Room::make_attachment_type(
2798                    content_type,
2799                    filename,
2800                    media_source,
2801                    config.caption,
2802                    config.info,
2803                    thumbnail,
2804                ),
2805                mentions,
2806                config.reply,
2807            )
2808            .await?;
2809
2810        let mut fut = self.send(content);
2811        if let Some(txn_id) = txn_id {
2812            fut = fut.with_transaction_id(txn_id);
2813        }
2814
2815        fut.await.map(|result| result.response)
2816    }
2817
2818    /// Creates the inner [`MessageType`] for an already-uploaded media file
2819    /// provided by its source.
2820    #[allow(clippy::too_many_arguments)]
2821    pub(crate) fn make_attachment_type(
2822        content_type: &Mime,
2823        filename: String,
2824        source: MediaSource,
2825        caption: Option<TextMessageEventContent>,
2826        info: Option<AttachmentInfo>,
2827        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2828    ) -> MessageType {
2829        make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2830    }
2831
2832    /// Creates the [`RoomMessageEventContent`] based on the message type,
2833    /// mentions and reply information.
2834    pub(crate) async fn make_media_event(
2835        &self,
2836        msg_type: MessageType,
2837        mentions: Option<Mentions>,
2838        reply: Option<Reply>,
2839    ) -> Result<RoomMessageEventContent> {
2840        let mut content = RoomMessageEventContent::new(msg_type);
2841        if let Some(mentions) = mentions {
2842            content = content.add_mentions(mentions);
2843        }
2844        if let Some(reply) = reply {
2845            // Since we just created the event, there is no relation attached to it. Thus,
2846            // it is safe to add the reply relation without overriding anything.
2847            content = self.make_reply_event(content.into(), reply).await?;
2848        }
2849        Ok(content)
2850    }
2851
2852    /// Creates the inner [`GalleryItemType`] for an already-uploaded media file
2853    /// provided by its source.
2854    #[cfg(feature = "unstable-msc4274")]
2855    #[allow(clippy::too_many_arguments)]
2856    pub(crate) fn make_gallery_item_type(
2857        content_type: &Mime,
2858        filename: String,
2859        source: MediaSource,
2860        caption: Option<TextMessageEventContent>,
2861        info: Option<AttachmentInfo>,
2862        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2863    ) -> GalleryItemType {
2864        make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2865    }
2866
2867    /// Update the power levels of a select set of users of this room.
2868    ///
2869    /// Issue a `power_levels` state event request to the server, changing the
2870    /// given UserId -> Int levels. May fail if the `power_levels` aren't
2871    /// locally known yet or the server rejects the state event update, e.g.
2872    /// because of insufficient permissions. Neither permissions to update
2873    /// nor whether the data might be stale is checked prior to issuing the
2874    /// request.
2875    pub async fn update_power_levels(
2876        &self,
2877        updates: Vec<(&UserId, Int)>,
2878    ) -> Result<send_state_event::v3::Response> {
2879        let mut power_levels = self.power_levels().await?;
2880
2881        for (user_id, new_level) in updates {
2882            if new_level == power_levels.users_default {
2883                power_levels.users.remove(user_id);
2884            } else {
2885                power_levels.users.insert(user_id.to_owned(), new_level);
2886            }
2887        }
2888
2889        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2890    }
2891
2892    /// Applies a set of power level changes to this room.
2893    ///
2894    /// Any values that are `None` in the given `RoomPowerLevelChanges` will
2895    /// remain unchanged.
2896    pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2897        let mut power_levels = self.power_levels().await?;
2898        power_levels.apply(changes)?;
2899        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2900        Ok(())
2901    }
2902
2903    /// Resets the room's power levels to the default values
2904    ///
2905    /// [spec]: https://spec.matrix.org/v1.9/client-server-api/#mroompower_levels
2906    pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2907        let creators = self.creators().unwrap_or_default();
2908        let rules = self.clone_info().room_version_rules_or_default();
2909
2910        let default_power_levels =
2911            RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2912        let changes = RoomPowerLevelChanges::from(default_power_levels);
2913        self.apply_power_level_changes(changes).await?;
2914        Ok(self.power_levels().await?)
2915    }
2916
2917    /// Gets the suggested role for the user with the provided `user_id`.
2918    ///
2919    /// This method checks the `RoomPowerLevels` events instead of loading the
2920    /// member list and looking for the member.
2921    pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2922        let power_level = self.get_user_power_level(user_id).await?;
2923        Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2924    }
2925
2926    /// Gets the power level the user with the provided `user_id`.
2927    ///
2928    /// This method checks the `RoomPowerLevels` events instead of loading the
2929    /// member list and looking for the member.
2930    pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2931        let event = self.power_levels().await?;
2932        Ok(event.for_user(user_id))
2933    }
2934
2935    /// Gets a map with the `UserId` of users with power levels other than `0`
2936    /// and this power level.
2937    pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2938        let power_levels = self.power_levels().await.ok();
2939        let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2940        if let Some(power_levels) = power_levels {
2941            for (id, level) in power_levels.users.into_iter() {
2942                user_power_levels.insert(id, level.into());
2943            }
2944        }
2945        user_power_levels
2946    }
2947
2948    /// Sets the name of this room.
2949    pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2950        self.send_state_event(RoomNameEventContent::new(name)).await
2951    }
2952
2953    /// Sets a new topic for this room.
2954    pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2955        self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2956    }
2957
2958    /// Sets the new avatar url for this room.
2959    ///
2960    /// # Arguments
2961    /// * `avatar_url` - The owned Matrix uri that represents the avatar
2962    /// * `info` - The optional image info that can be provided for the avatar
2963    pub async fn set_avatar_url(
2964        &self,
2965        url: &MxcUri,
2966        info: Option<avatar::ImageInfo>,
2967    ) -> Result<send_state_event::v3::Response> {
2968        self.ensure_room_joined()?;
2969
2970        let mut room_avatar_event = RoomAvatarEventContent::new();
2971        room_avatar_event.url = Some(url.to_owned());
2972        room_avatar_event.info = info.map(Box::new);
2973
2974        self.send_state_event(room_avatar_event).await
2975    }
2976
2977    /// Removes the avatar from the room
2978    pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2979        self.send_state_event(RoomAvatarEventContent::new()).await
2980    }
2981
2982    /// Uploads a new avatar for this room.
2983    ///
2984    /// # Arguments
2985    /// * `mime` - The mime type describing the data
2986    /// * `data` - The data representation of the avatar
2987    /// * `info` - The optional image info provided for the avatar, the blurhash
2988    ///   and the mimetype will always be updated
2989    pub async fn upload_avatar(
2990        &self,
2991        mime: &Mime,
2992        data: Vec<u8>,
2993        info: Option<avatar::ImageInfo>,
2994    ) -> Result<send_state_event::v3::Response> {
2995        self.ensure_room_joined()?;
2996
2997        let upload_response = self.client.media().upload(mime, data, None).await?;
2998        let mut info = info.unwrap_or_default();
2999        info.blurhash = upload_response.blurhash;
3000        info.mimetype = Some(mime.to_string());
3001
3002        self.set_avatar_url(&upload_response.content_uri, Some(info)).await
3003    }
3004
3005    /// Send a state event with an empty state key to the homeserver.
3006    ///
3007    /// For state events with a non-empty state key, see
3008    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3009    ///
3010    /// Returns the parsed response from the server.
3011    ///
3012    /// # Arguments
3013    ///
3014    /// * `content` - The content of the state event.
3015    ///
3016    /// # Examples
3017    ///
3018    /// ```no_run
3019    /// # use serde::{Deserialize, Serialize};
3020    /// # async {
3021    /// # let joined_room: matrix_sdk::Room = todo!();
3022    /// use matrix_sdk::ruma::{
3023    ///     EventEncryptionAlgorithm,
3024    ///     events::{
3025    ///         EmptyStateKey, macros::EventContent,
3026    ///         room::encryption::RoomEncryptionEventContent,
3027    ///     },
3028    /// };
3029    ///
3030    /// let encryption_event_content = RoomEncryptionEventContent::new(
3031    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
3032    /// );
3033    /// joined_room.send_state_event(encryption_event_content).await?;
3034    ///
3035    /// // Custom event:
3036    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3037    /// #[ruma_event(
3038    ///     type = "org.matrix.msc_9000.xxx",
3039    ///     kind = State,
3040    ///     state_key_type = EmptyStateKey,
3041    /// )]
3042    /// struct XxxStateEventContent {/* fields... */}
3043    ///
3044    /// let content: XxxStateEventContent = todo!();
3045    /// joined_room.send_state_event(content).await?;
3046    /// # anyhow::Ok(()) };
3047    /// ```
3048    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3049    #[instrument(skip_all)]
3050    pub async fn send_state_event(
3051        &self,
3052        content: impl StateEventContent<StateKey = EmptyStateKey>,
3053    ) -> Result<send_state_event::v3::Response> {
3054        self.send_state_event_for_key(&EmptyStateKey, content).await
3055    }
3056
3057    /// Send a state event with an empty state key to the homeserver.
3058    ///
3059    /// For state events with a non-empty state key, see
3060    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3061    ///
3062    /// If the experimental state event encryption feature is enabled, this
3063    /// method will transparently encrypt the event if this room is
3064    /// encrypted (except if the event type is considered critical for the room
3065    /// to function, as outlined in [MSC4362][msc4362]).
3066    ///
3067    /// Returns the parsed response from the server.
3068    ///
3069    /// # Arguments
3070    ///
3071    /// * `content` - The content of the state event.
3072    ///
3073    /// # Examples
3074    ///
3075    /// ```no_run
3076    /// # use serde::{Deserialize, Serialize};
3077    /// # async {
3078    /// # let joined_room: matrix_sdk::Room = todo!();
3079    /// use matrix_sdk::ruma::{
3080    ///     EventEncryptionAlgorithm,
3081    ///     events::{
3082    ///         EmptyStateKey, macros::EventContent,
3083    ///         room::encryption::RoomEncryptionEventContent,
3084    ///     },
3085    /// };
3086    ///
3087    /// let encryption_event_content = RoomEncryptionEventContent::new(
3088    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
3089    /// );
3090    /// joined_room.send_state_event(encryption_event_content).await?;
3091    ///
3092    /// // Custom event:
3093    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3094    /// #[ruma_event(
3095    ///     type = "org.matrix.msc_9000.xxx",
3096    ///     kind = State,
3097    ///     state_key_type = EmptyStateKey,
3098    /// )]
3099    /// struct XxxStateEventContent {/* fields... */}
3100    ///
3101    /// let content: XxxStateEventContent = todo!();
3102    /// joined_room.send_state_event(content).await?;
3103    /// # anyhow::Ok(()) };
3104    /// ```
3105    ///
3106    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/4362-encrypted-state.md
3107    #[cfg(feature = "experimental-encrypted-state-events")]
3108    #[instrument(skip_all)]
3109    pub fn send_state_event<'a>(
3110        &'a self,
3111        content: impl StateEventContent<StateKey = EmptyStateKey>,
3112    ) -> SendStateEvent<'a> {
3113        self.send_state_event_for_key(&EmptyStateKey, content)
3114    }
3115
3116    /// Send a state event to the homeserver.
3117    ///
3118    /// Returns the parsed response from the server.
3119    ///
3120    /// # Arguments
3121    ///
3122    /// * `content` - The content of the state event.
3123    ///
3124    /// * `state_key` - A unique key which defines the overwriting semantics for
3125    ///   this piece of room state.
3126    ///
3127    /// # Examples
3128    ///
3129    /// ```no_run
3130    /// # use serde::{Deserialize, Serialize};
3131    /// # async {
3132    /// # let joined_room: matrix_sdk::Room = todo!();
3133    /// use matrix_sdk::ruma::{
3134    ///     events::{
3135    ///         macros::EventContent,
3136    ///         room::member::{RoomMemberEventContent, MembershipState},
3137    ///     },
3138    ///     mxc_uri,
3139    /// };
3140    ///
3141    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3142    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3143    /// content.avatar_url = Some(avatar_url);
3144    ///
3145    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3146    ///
3147    /// // Custom event:
3148    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3149    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3150    /// struct XxxStateEventContent { /* fields... */ }
3151    ///
3152    /// let content: XxxStateEventContent = todo!();
3153    /// joined_room.send_state_event_for_key("foo", content).await?;
3154    /// # anyhow::Ok(()) };
3155    /// ```
3156    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3157    pub async fn send_state_event_for_key<C, K>(
3158        &self,
3159        state_key: &K,
3160        content: C,
3161    ) -> Result<send_state_event::v3::Response>
3162    where
3163        C: StateEventContent,
3164        C::StateKey: Borrow<K>,
3165        K: AsRef<str> + ?Sized,
3166    {
3167        self.ensure_room_joined()?;
3168        let request =
3169            send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3170        let response = self.client.send(request).await?;
3171        Ok(response)
3172    }
3173
3174    /// Send a state event to the homeserver. If state encryption is enabled in
3175    /// this room, the event will be encrypted.
3176    ///
3177    /// If the experimental state event encryption feature is enabled, this
3178    /// method will transparently encrypt the event if this room is
3179    /// encrypted (except if the event type is considered critical for the room
3180    /// to function, as outlined in [MSC4362][msc4362]).
3181    ///
3182    /// Returns the parsed response from the server.
3183    ///
3184    /// # Arguments
3185    ///
3186    /// * `content` - The content of the state event.
3187    ///
3188    /// * `state_key` - A unique key which defines the overwriting semantics for
3189    ///   this piece of room state.
3190    ///
3191    /// # Examples
3192    ///
3193    /// ```no_run
3194    /// # use serde::{Deserialize, Serialize};
3195    /// # async {
3196    /// # let joined_room: matrix_sdk::Room = todo!();
3197    /// use matrix_sdk::ruma::{
3198    ///     events::{
3199    ///         macros::EventContent,
3200    ///         room::member::{RoomMemberEventContent, MembershipState},
3201    ///     },
3202    ///     mxc_uri,
3203    /// };
3204    ///
3205    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3206    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3207    /// content.avatar_url = Some(avatar_url);
3208    ///
3209    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3210    ///
3211    /// // Custom event:
3212    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3213    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3214    /// struct XxxStateEventContent { /* fields... */ }
3215    ///
3216    /// let content: XxxStateEventContent = todo!();
3217    /// joined_room.send_state_event_for_key("foo", content).await?;
3218    /// # anyhow::Ok(()) };
3219    /// ```
3220    ///
3221    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3222    #[cfg(feature = "experimental-encrypted-state-events")]
3223    pub fn send_state_event_for_key<'a, C, K>(
3224        &'a self,
3225        state_key: &K,
3226        content: C,
3227    ) -> SendStateEvent<'a>
3228    where
3229        C: StateEventContent,
3230        C::StateKey: Borrow<K>,
3231        K: AsRef<str> + ?Sized,
3232    {
3233        SendStateEvent::new(self, state_key, content)
3234    }
3235
3236    /// Send a raw room state event to the homeserver.
3237    ///
3238    /// Returns the parsed response from the server.
3239    ///
3240    /// # Arguments
3241    ///
3242    /// * `event_type` - The type of the event that we're sending out.
3243    ///
3244    /// * `state_key` - A unique key which defines the overwriting semantics for
3245    /// this piece of room state. This value is often a zero-length string.
3246    ///
3247    /// * `content` - The content of the event as a raw JSON value. The argument
3248    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3249    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3250    ///
3251    /// # Examples
3252    ///
3253    /// ```no_run
3254    /// use serde_json::json;
3255    ///
3256    /// # async {
3257    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3258    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3259    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3260    ///
3261    /// if let Some(room) = client.get_room(&room_id) {
3262    ///     room.send_state_event_raw("m.room.member", "", json!({
3263    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3264    ///         "displayname": "Alice Margatroid",
3265    ///         "membership": "join",
3266    ///     })).await?;
3267    /// }
3268    /// # anyhow::Ok(()) };
3269    /// ```
3270    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3271    #[instrument(skip_all)]
3272    pub async fn send_state_event_raw(
3273        &self,
3274        event_type: &str,
3275        state_key: &str,
3276        content: impl IntoRawStateEventContent,
3277    ) -> Result<send_state_event::v3::Response> {
3278        self.ensure_room_joined()?;
3279
3280        let request = send_state_event::v3::Request::new_raw(
3281            self.room_id().to_owned(),
3282            event_type.into(),
3283            state_key.to_owned(),
3284            content.into_raw_state_event_content(),
3285        );
3286
3287        Ok(self.client.send(request).await?)
3288    }
3289
3290    /// Send a raw room state event to the homeserver.
3291    ///
3292    /// If the experimental state event encryption feature is enabled, this
3293    /// method will transparently encrypt the event if this room is
3294    /// encrypted (except if the event type is considered critical for the room
3295    /// to function, as outlined in [MSC4362][msc4362]).
3296    ///
3297    /// Returns the parsed response from the server.
3298    ///
3299    /// # Arguments
3300    ///
3301    /// * `event_type` - The type of the event that we're sending out.
3302    ///
3303    /// * `state_key` - A unique key which defines the overwriting semantics for
3304    /// this piece of room state. This value is often a zero-length string.
3305    ///
3306    /// * `content` - The content of the event as a raw JSON value. The argument
3307    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3308    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3309    ///
3310    /// # Examples
3311    ///
3312    /// ```no_run
3313    /// use serde_json::json;
3314    ///
3315    /// # async {
3316    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3317    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3318    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3319    ///
3320    /// if let Some(room) = client.get_room(&room_id) {
3321    ///     room.send_state_event_raw("m.room.member", "", json!({
3322    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3323    ///         "displayname": "Alice Margatroid",
3324    ///         "membership": "join",
3325    ///     })).await?;
3326    /// }
3327    /// # anyhow::Ok(()) };
3328    /// ```
3329    ///
3330    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3331    #[cfg(feature = "experimental-encrypted-state-events")]
3332    #[instrument(skip_all)]
3333    pub fn send_state_event_raw<'a>(
3334        &'a self,
3335        event_type: &'a str,
3336        state_key: &'a str,
3337        content: impl IntoRawStateEventContent,
3338    ) -> SendRawStateEvent<'a> {
3339        SendRawStateEvent::new(self, event_type, state_key, content)
3340    }
3341
3342    /// Strips all information out of an event of the room.
3343    ///
3344    /// Returns the [`redact_event::v3::Response`] from the server.
3345    ///
3346    /// This cannot be undone. Users may redact their own events, and any user
3347    /// with a power level greater than or equal to the redact power level of
3348    /// the room may redact events there.
3349    ///
3350    /// # Arguments
3351    ///
3352    /// * `event_id` - The ID of the event to redact
3353    ///
3354    /// * `reason` - The reason for the event being redacted.
3355    ///
3356    /// * `txn_id` - A unique ID that can be attached to this event as
3357    /// its transaction ID. If not given one is created for the message.
3358    ///
3359    /// # Examples
3360    ///
3361    /// ```no_run
3362    /// use matrix_sdk::ruma::event_id;
3363    ///
3364    /// # async {
3365    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3366    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3367    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3368    /// #
3369    /// if let Some(room) = client.get_room(&room_id) {
3370    ///     let event_id = event_id!("$xxxxxx:example.org");
3371    ///     let reason = Some("Indecent material");
3372    ///     room.redact(&event_id, reason, None).await?;
3373    /// }
3374    /// # anyhow::Ok(()) };
3375    /// ```
3376    #[instrument(skip_all)]
3377    pub async fn redact(
3378        &self,
3379        event_id: &EventId,
3380        reason: Option<&str>,
3381        txn_id: Option<OwnedTransactionId>,
3382    ) -> HttpResult<redact_event::v3::Response> {
3383        let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3384        let request = assign!(
3385            redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3386            { reason: reason.map(ToOwned::to_owned) }
3387        );
3388
3389        self.client.send(request).await
3390    }
3391
3392    /// Get a list of servers that should know this room.
3393    ///
3394    /// Uses the synced members of the room and the suggested [routing
3395    /// algorithm] from the Matrix spec.
3396    ///
3397    /// Returns at most three servers.
3398    ///
3399    /// [routing algorithm]: https://spec.matrix.org/v1.3/appendices/#routing
3400    pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3401        let acl_ev = self
3402            .get_state_event_static::<RoomServerAclEventContent>()
3403            .await?
3404            .and_then(|ev| ev.deserialize().ok());
3405        let acl = acl_ev.as_ref().and_then(|ev| match ev {
3406            SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3407            SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3408        });
3409
3410        // Filter out server names that:
3411        // - Are blocked due to server ACLs
3412        // - Are IP addresses
3413        let members: Vec<_> = self
3414            .members_no_sync(RoomMemberships::JOIN)
3415            .await?
3416            .into_iter()
3417            .filter(|member| {
3418                let server = member.user_id().server_name();
3419                acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3420            })
3421            .collect();
3422
3423        // Get the server of the highest power level user in the room, provided
3424        // they are at least power level 50.
3425        let max = members
3426            .iter()
3427            .max_by_key(|member| member.power_level())
3428            .filter(|max| max.power_level() >= int!(50))
3429            .map(|member| member.user_id().server_name());
3430
3431        // Sort the servers by population.
3432        let servers = members
3433            .iter()
3434            .map(|member| member.user_id().server_name())
3435            .filter(|server| max.filter(|max| max == server).is_none())
3436            .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3437                *servers.entry(server).or_default() += 1;
3438                servers
3439            });
3440        let mut servers: Vec<_> = servers.into_iter().collect();
3441        servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3442
3443        Ok(max
3444            .into_iter()
3445            .chain(servers.into_iter().map(|(name, _)| name))
3446            .take(3)
3447            .map(ToOwned::to_owned)
3448            .collect())
3449    }
3450
3451    /// Get a `matrix.to` permalink to this room.
3452    ///
3453    /// If this room has an alias, we use it. Otherwise, we try to use the
3454    /// synced members in the room for [routing] the room ID.
3455    ///
3456    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3457    pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3458        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3459            return Ok(alias.matrix_to_uri());
3460        }
3461
3462        let via = self.route().await?;
3463        Ok(self.room_id().matrix_to_uri_via(via))
3464    }
3465
3466    /// Get a `matrix:` permalink to this room.
3467    ///
3468    /// If this room has an alias, we use it. Otherwise, we try to use the
3469    /// synced members in the room for [routing] the room ID.
3470    ///
3471    /// # Arguments
3472    ///
3473    /// * `join` - Whether the user should join the room.
3474    ///
3475    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3476    pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3477        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3478            return Ok(alias.matrix_uri(join));
3479        }
3480
3481        let via = self.route().await?;
3482        Ok(self.room_id().matrix_uri_via(via, join))
3483    }
3484
3485    /// Get a `matrix.to` permalink to an event in this room.
3486    ///
3487    /// We try to use the synced members in the room for [routing] the room ID.
3488    ///
3489    /// *Note*: This method does not check if the given event ID is actually
3490    /// part of this room. It needs to be checked before calling this method
3491    /// otherwise the permalink won't work.
3492    ///
3493    /// # Arguments
3494    ///
3495    /// * `event_id` - The ID of the event.
3496    ///
3497    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3498    pub async fn matrix_to_event_permalink(
3499        &self,
3500        event_id: impl Into<OwnedEventId>,
3501    ) -> Result<MatrixToUri> {
3502        // Don't use the alias because an event is tied to a room ID, but an
3503        // alias might point to another room, e.g. after a room upgrade.
3504        let via = self.route().await?;
3505        Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3506    }
3507
3508    /// Get a `matrix:` permalink to an event in this room.
3509    ///
3510    /// We try to use the synced members in the room for [routing] the room ID.
3511    ///
3512    /// *Note*: This method does not check if the given event ID is actually
3513    /// part of this room. It needs to be checked before calling this method
3514    /// otherwise the permalink won't work.
3515    ///
3516    /// # Arguments
3517    ///
3518    /// * `event_id` - The ID of the event.
3519    ///
3520    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3521    pub async fn matrix_event_permalink(
3522        &self,
3523        event_id: impl Into<OwnedEventId>,
3524    ) -> Result<MatrixUri> {
3525        // Don't use the alias because an event is tied to a room ID, but an
3526        // alias might point to another room, e.g. after a room upgrade.
3527        let via = self.route().await?;
3528        Ok(self.room_id().matrix_event_uri_via(event_id, via))
3529    }
3530
3531    /// Get the latest receipt of a user in this room.
3532    ///
3533    /// # Arguments
3534    ///
3535    /// * `receipt_type` - The type of receipt to get.
3536    ///
3537    /// * `thread` - The thread containing the event of the receipt, if any.
3538    ///
3539    /// * `user_id` - The ID of the user.
3540    ///
3541    /// Returns the ID of the event on which the receipt applies and the
3542    /// receipt.
3543    pub async fn load_user_receipt(
3544        &self,
3545        receipt_type: ReceiptType,
3546        thread: ReceiptThread,
3547        user_id: &UserId,
3548    ) -> Result<Option<(OwnedEventId, Receipt)>> {
3549        self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3550    }
3551
3552    /// Load the receipts for an event in this room from storage.
3553    ///
3554    /// # Arguments
3555    ///
3556    /// * `receipt_type` - The type of receipt to get.
3557    ///
3558    /// * `thread` - The thread containing the event of the receipt, if any.
3559    ///
3560    /// * `event_id` - The ID of the event.
3561    ///
3562    /// Returns a list of IDs of users who have sent a receipt for the event and
3563    /// the corresponding receipts.
3564    pub async fn load_event_receipts(
3565        &self,
3566        receipt_type: ReceiptType,
3567        thread: ReceiptThread,
3568        event_id: &EventId,
3569    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3570        self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3571    }
3572
3573    /// Get the push-condition context for this room.
3574    ///
3575    /// Returns `None` if some data couldn't be found. This should only happen
3576    /// in brand new rooms, while we process its state.
3577    pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3578        self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions().await?)
3579            .await
3580    }
3581
3582    /// Get the push-condition context for this room, with a choice to include
3583    /// thread subscriptions or not, based on the extra
3584    /// `with_threads_subscriptions` parameter.
3585    ///
3586    /// Returns `None` if some data couldn't be found. This should only happen
3587    /// in brand new rooms, while we process its state.
3588    pub(crate) async fn push_condition_room_ctx_internal(
3589        &self,
3590        with_threads_subscriptions: bool,
3591    ) -> Result<Option<PushConditionRoomCtx>> {
3592        let room_id = self.room_id();
3593        let user_id = self.own_user_id();
3594        let room_info = self.clone_info();
3595        let member_count = room_info.active_members_count();
3596
3597        let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3598            member.name().to_owned()
3599        } else {
3600            return Ok(None);
3601        };
3602
3603        let power_levels = match self.power_levels().await {
3604            Ok(power_levels) => Some(power_levels.into()),
3605            Err(error) => {
3606                if matches!(room_info.state(), RoomState::Joined) {
3607                    // It's normal to not have the power levels in a non-joined room, so don't log
3608                    // the error if the room is not joined
3609                    error!("Could not compute power levels for push conditions: {error}");
3610                }
3611                None
3612            }
3613        };
3614
3615        let mut ctx = assign!(PushConditionRoomCtx::new(
3616            room_id.to_owned(),
3617            UInt::new(member_count).unwrap_or(UInt::MAX),
3618            user_id.to_owned(),
3619            user_display_name,
3620        ),
3621        {
3622            power_levels,
3623        });
3624
3625        if with_threads_subscriptions {
3626            let this = self.clone();
3627            ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3628                let room = this.clone();
3629                Box::pin(async move {
3630                    if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3631                        maybe_sub.is_some()
3632                    } else {
3633                        false
3634                    }
3635                })
3636            });
3637        }
3638
3639        Ok(Some(ctx))
3640    }
3641
3642    /// Retrieves a [`PushContext`] that can be used to compute the push
3643    /// actions for events.
3644    pub async fn push_context(&self) -> Result<Option<PushContext>> {
3645        self.push_context_internal(self.client.enabled_thread_subscriptions().await?).await
3646    }
3647
3648    /// Retrieves a [`PushContext`] that can be used to compute the push actions
3649    /// for events, with a choice to include thread subscriptions or not,
3650    /// based on the extra `with_threads_subscriptions` parameter.
3651    #[instrument(skip(self))]
3652    pub(crate) async fn push_context_internal(
3653        &self,
3654        with_threads_subscriptions: bool,
3655    ) -> Result<Option<PushContext>> {
3656        let Some(push_condition_room_ctx) =
3657            self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3658        else {
3659            debug!("Could not aggregate push context");
3660            return Ok(None);
3661        };
3662        let push_rules = self.client().account().push_rules().await?;
3663        Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3664    }
3665
3666    /// Get the push actions for the given event with the current room state.
3667    ///
3668    /// Note that it is possible that no push action is returned because the
3669    /// current room state does not have all the required state events.
3670    pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3671        if let Some(ctx) = self.push_context().await? {
3672            Ok(Some(ctx.for_event(event).await))
3673        } else {
3674            Ok(None)
3675        }
3676    }
3677
3678    /// The membership details of the (latest) invite for the logged-in user in
3679    /// this room.
3680    pub async fn invite_details(&self) -> Result<Invite> {
3681        let state = self.state();
3682
3683        if state != RoomState::Invited {
3684            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3685        }
3686
3687        let invitee = self
3688            .get_member_no_sync(self.own_user_id())
3689            .await?
3690            .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3691        let event = invitee.event();
3692
3693        let inviter_id = event.sender().to_owned();
3694        let inviter = self.get_member_no_sync(&inviter_id).await?;
3695
3696        Ok(Invite { invitee, inviter_id, inviter })
3697    }
3698
3699    /// Get the membership details for the current user.
3700    ///
3701    /// Returns:
3702    ///     - If the user was present in the room, a
3703    ///       [`RoomMemberWithSenderInfo`] containing both the user info and the
3704    ///       member info of the sender of the `m.room.member` event.
3705    ///     - If the current user is not present, an error.
3706    pub async fn member_with_sender_info(
3707        &self,
3708        user_id: &UserId,
3709    ) -> Result<RoomMemberWithSenderInfo> {
3710        let Some(member) = self.get_member_no_sync(user_id).await? else {
3711            return Err(Error::InsufficientData);
3712        };
3713
3714        let sender_member =
3715            if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3716                // If the sender room member info is already available, return it
3717                Some(member)
3718            } else if self.are_members_synced() {
3719                // The room members are synced and we couldn't find the sender info
3720                None
3721            } else if self.sync_members().await.is_ok() {
3722                // Try getting the sender room member info again after syncing
3723                self.get_member_no_sync(member.event().sender()).await?
3724            } else {
3725                None
3726            };
3727
3728        Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3729    }
3730
3731    /// Forget this room.
3732    ///
3733    /// This communicates to the homeserver that it should forget the room.
3734    ///
3735    /// Only left or banned-from rooms can be forgotten.
3736    pub async fn forget(&self) -> Result<()> {
3737        let state = self.state();
3738        match state {
3739            RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3740                return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3741                    "Left / Banned",
3742                    state,
3743                ))));
3744            }
3745            RoomState::Left | RoomState::Banned => {}
3746        }
3747
3748        let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3749        let _response = self.client.send(request).await?;
3750
3751        // If it was a DM, remove the room from the `m.direct` global account data.
3752        if self.inner.direct_targets_length() != 0
3753            && let Err(e) = self.set_is_direct(false).await
3754        {
3755            // It is not important whether we managed to remove the room, it will not have
3756            // any consequences, so just log the error.
3757            warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3758        }
3759
3760        self.client.base_client().forget_room(self.inner.room_id()).await?;
3761
3762        Ok(())
3763    }
3764
3765    fn ensure_room_joined(&self) -> Result<()> {
3766        let state = self.state();
3767        if state == RoomState::Joined {
3768            Ok(())
3769        } else {
3770            Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3771        }
3772    }
3773
3774    /// Get the notification mode.
3775    pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3776        if !matches!(self.state(), RoomState::Joined) {
3777            return None;
3778        }
3779
3780        let notification_settings = self.client().notification_settings().await;
3781
3782        // Get the user-defined mode if available
3783        let notification_mode =
3784            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3785
3786        if notification_mode.is_some() {
3787            notification_mode
3788        } else if let Ok(is_encrypted) =
3789            self.latest_encryption_state().await.map(|state| state.is_encrypted())
3790        {
3791            // Otherwise, if encrypted status is available, get the default mode for this
3792            // type of room.
3793            // From the point of view of notification settings, a `one-to-one` room is one
3794            // that involves exactly two people.
3795            let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3796            let default_mode = notification_settings
3797                .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3798                .await;
3799            Some(default_mode)
3800        } else {
3801            None
3802        }
3803    }
3804
3805    /// Get the user-defined notification mode.
3806    ///
3807    /// The result is cached for fast and non-async call. To read the cached
3808    /// result, use
3809    /// [`matrix_sdk_base::Room::cached_user_defined_notification_mode`].
3810    //
3811    // Note for maintainers:
3812    //
3813    // The fact the result is cached is an important property. If you change that in
3814    // the future, please review all calls to this method.
3815    pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3816        if !matches!(self.state(), RoomState::Joined) {
3817            return None;
3818        }
3819
3820        let notification_settings = self.client().notification_settings().await;
3821
3822        // Get the user-defined mode if available.
3823        let mode =
3824            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3825
3826        if let Some(mode) = mode {
3827            self.update_cached_user_defined_notification_mode(mode);
3828        }
3829
3830        mode
3831    }
3832
3833    /// Report an event as inappropriate to the homeserver's administrator.
3834    ///
3835    /// # Arguments
3836    ///
3837    /// * `event_id` - The ID of the event to report.
3838    /// * `score` - The score to rate this content.
3839    /// * `reason` - The reason the content is being reported.
3840    ///
3841    /// # Errors
3842    ///
3843    /// Returns an error if the room is not joined or if an error occurs with
3844    /// the request.
3845    pub async fn report_content(
3846        &self,
3847        event_id: OwnedEventId,
3848        reason: Option<String>,
3849    ) -> Result<report_content::v3::Response> {
3850        let state = self.state();
3851        if state != RoomState::Joined {
3852            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3853        }
3854
3855        let request = assign!(
3856            report_content::v3::Request::new(
3857                self.inner.room_id().to_owned(),
3858                event_id,
3859            ), {
3860                reason: reason
3861            }
3862        );
3863        Ok(self.client.send(request).await?)
3864    }
3865
3866    /// Reports a room as inappropriate to the server.
3867    /// The caller is not required to be joined to the room to report it.
3868    ///
3869    /// # Arguments
3870    ///
3871    /// * `reason` - The reason the room is being reported.
3872    ///
3873    /// # Errors
3874    ///
3875    /// Returns an error if the room is not found or on rate limit
3876    pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3877        let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3878
3879        Ok(self.client.send(request).await?)
3880    }
3881
3882    /// Set a flag on the room to indicate that the user has explicitly marked
3883    /// it as (un)read.
3884    ///
3885    /// This is a no-op if [`BaseRoom::is_marked_unread()`] returns the same
3886    /// value as `unread`.
3887    pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3888        if self.is_marked_unread() == unread {
3889            // The request is not necessary.
3890            return Ok(());
3891        }
3892
3893        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3894
3895        let content = MarkedUnreadEventContent::new(unread);
3896
3897        let request = set_room_account_data::v3::Request::new(
3898            user_id.to_owned(),
3899            self.inner.room_id().to_owned(),
3900            &content,
3901        )?;
3902
3903        self.client.send(request).await?;
3904        Ok(())
3905    }
3906
3907    /// Returns the [`RoomEventCache`] associated to this room, assuming the
3908    /// global [`EventCache`] has been enabled for subscription.
3909    pub async fn event_cache(
3910        &self,
3911    ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3912        self.client.event_cache().for_room(self.room_id()).await
3913    }
3914
3915    /// Get the beacon information event in the room for the `user_id`.
3916    ///
3917    /// # Errors
3918    ///
3919    /// Returns an error if the event is redacted, stripped, not found or could
3920    /// not be deserialized.
3921    pub(crate) async fn get_user_beacon_info(
3922        &self,
3923        user_id: &UserId,
3924    ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3925        let raw_event = self
3926            .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3927            .await?
3928            .ok_or(BeaconError::NotFound)?;
3929
3930        match raw_event.deserialize()? {
3931            SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3932            SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3933            SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3934        }
3935    }
3936
3937    /// Start sharing live location in the room.
3938    ///
3939    /// # Arguments
3940    ///
3941    /// * `duration_millis` - The duration for which the live location is
3942    ///   shared, in milliseconds.
3943    /// * `description` - An optional description for the live location share.
3944    ///
3945    /// # Errors
3946    ///
3947    /// Returns an error if the room is not joined or if the state event could
3948    /// not be sent.
3949    pub async fn start_live_location_share(
3950        &self,
3951        duration_millis: u64,
3952        description: Option<String>,
3953    ) -> Result<send_state_event::v3::Response> {
3954        self.ensure_room_joined()?;
3955
3956        self.send_state_event_for_key(
3957            self.own_user_id(),
3958            BeaconInfoEventContent::new(
3959                description,
3960                Duration::from_millis(duration_millis),
3961                true,
3962                None,
3963            ),
3964        )
3965        .await
3966    }
3967
3968    /// Stop sharing live location in the room.
3969    ///
3970    /// # Errors
3971    ///
3972    /// Returns an error if the room is not joined, if the beacon information
3973    /// is redacted or stripped, or if the state event is not found.
3974    pub async fn stop_live_location_share(
3975        &self,
3976    ) -> Result<send_state_event::v3::Response, BeaconError> {
3977        self.ensure_room_joined()?;
3978
3979        let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3980        beacon_info_event.content.stop();
3981        Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3982    }
3983
3984    /// Send a location beacon event in the current room.
3985    ///
3986    /// # Arguments
3987    ///
3988    /// * `geo_uri` - The geo URI of the location beacon.
3989    ///
3990    /// # Errors
3991    ///
3992    /// Returns an error if the room is not joined, if the beacon information
3993    /// is redacted or stripped, if the location share is no longer live,
3994    /// or if the state event is not found.
3995    pub async fn send_location_beacon(
3996        &self,
3997        geo_uri: String,
3998    ) -> Result<send_message_event::v3::Response, BeaconError> {
3999        self.ensure_room_joined()?;
4000
4001        let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
4002
4003        if beacon_info_event.content.is_live() {
4004            let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
4005            Ok(self.send(content).await?.response)
4006        } else {
4007            Err(BeaconError::NotLive)
4008        }
4009    }
4010
4011    /// Store the given `ComposerDraft` in the state store using the current
4012    /// room id and optional thread root id as identifier.
4013    pub async fn save_composer_draft(
4014        &self,
4015        draft: ComposerDraft,
4016        thread_root: Option<&EventId>,
4017    ) -> Result<()> {
4018        self.client
4019            .state_store()
4020            .set_kv_data(
4021                StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
4022                StateStoreDataValue::ComposerDraft(draft),
4023            )
4024            .await?;
4025        Ok(())
4026    }
4027
4028    /// Retrieve the `ComposerDraft` stored in the state store for this room
4029    /// and given thread, if any.
4030    pub async fn load_composer_draft(
4031        &self,
4032        thread_root: Option<&EventId>,
4033    ) -> Result<Option<ComposerDraft>> {
4034        let data = self
4035            .client
4036            .state_store()
4037            .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4038            .await?;
4039        Ok(data.and_then(|d| d.into_composer_draft()))
4040    }
4041
4042    /// Remove the `ComposerDraft` stored in the state store for this room
4043    /// and given thread, if any.
4044    pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
4045        self.client
4046            .state_store()
4047            .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4048            .await?;
4049        Ok(())
4050    }
4051
4052    /// Load pinned state events for a room from the `/state` endpoint in the
4053    /// home server.
4054    pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
4055        let response = self
4056            .client
4057            .send(get_state_event_for_key::v3::Request::new(
4058                self.room_id().to_owned(),
4059                StateEventType::RoomPinnedEvents,
4060                "".to_owned(),
4061            ))
4062            .await;
4063
4064        match response {
4065            Ok(response) => Ok(Some(
4066                response
4067                    .into_content()
4068                    .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
4069                    .pinned,
4070            )),
4071            Err(http_error) => match http_error.as_client_api_error() {
4072                Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
4073                _ => Err(http_error.into()),
4074            },
4075        }
4076    }
4077
4078    /// Observe live location sharing events for this room.
4079    ///
4080    /// The returned observable will receive the newest event for each sync
4081    /// response that contains an `m.beacon` event.
4082    ///
4083    /// Returns a stream of [`ObservableLiveLocation`] events from other users
4084    /// in the room, excluding the live location events of the room's own user.
4085    pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
4086        ObservableLiveLocation::new(&self.client, self.room_id())
4087    }
4088
4089    /// Subscribe to knock requests in this `Room`.
4090    ///
4091    /// The current requests to join the room will be emitted immediately
4092    /// when subscribing.
4093    ///
4094    /// A new set of knock requests will be emitted whenever:
4095    /// - A new member event is received.
4096    /// - A knock request is marked as seen.
4097    /// - A sync is gappy (limited), so room membership information may be
4098    ///   outdated.
4099    ///
4100    /// Returns both a stream of knock requests and a handle for a task that
4101    /// will clean up the seen knock request ids when possible.
4102    pub async fn subscribe_to_knock_requests(
4103        &self,
4104    ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
4105        let this = Arc::new(self.clone());
4106
4107        let room_member_events_observer =
4108            self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
4109
4110        let current_seen_ids = self.get_seen_knock_request_ids().await?;
4111        let mut seen_request_ids_stream = self
4112            .seen_knock_request_ids_map
4113            .subscribe()
4114            .await
4115            .map(|values| values.unwrap_or_default());
4116
4117        let mut room_info_stream = self.subscribe_info();
4118
4119        // Spawn a task that will clean up the seen knock request ids when updated room
4120        // members are received
4121        let clear_seen_ids_handle = spawn({
4122            let this = self.clone();
4123            async move {
4124                let mut member_updates_stream = this.room_member_updates_sender.subscribe();
4125                while member_updates_stream.recv().await.is_ok() {
4126                    // If room members were updated, try to remove outdated seen knock request ids
4127                    if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
4128                        warn!("Failed to remove seen knock requests: {err}")
4129                    }
4130                }
4131            }
4132        });
4133
4134        let combined_stream = stream! {
4135            // Emit current requests to join
4136            match this.get_current_join_requests(&current_seen_ids).await {
4137                Ok(initial_requests) => yield initial_requests,
4138                Err(err) => warn!("Failed to get initial requests to join: {err}")
4139            }
4140
4141            let mut requests_stream = room_member_events_observer.subscribe();
4142            let mut seen_ids = current_seen_ids.clone();
4143
4144            loop {
4145                // This is equivalent to a combine stream operation, triggering a new emission
4146                // when any of the branches changes
4147                tokio::select! {
4148                    Some((event, _)) = requests_stream.next() => {
4149                        if let Some(event) = event.as_original() {
4150                            // If we can calculate the membership change, try to emit only when needed
4151                            let emit = if event.prev_content().is_some() {
4152                                matches!(event.membership_change(),
4153                                    MembershipChange::Banned |
4154                                    MembershipChange::Knocked |
4155                                    MembershipChange::KnockAccepted |
4156                                    MembershipChange::KnockDenied |
4157                                    MembershipChange::KnockRetracted
4158                                )
4159                            } else {
4160                                // If we can't calculate the membership change, assume we need to
4161                                // emit updated values
4162                                true
4163                            };
4164
4165                            if emit {
4166                                match this.get_current_join_requests(&seen_ids).await {
4167                                    Ok(requests) => yield requests,
4168                                    Err(err) => {
4169                                        warn!("Failed to get updated knock requests on new member event: {err}")
4170                                    }
4171                                }
4172                            }
4173                        }
4174                    }
4175
4176                    Some(new_seen_ids) = seen_request_ids_stream.next() => {
4177                        // Update the current seen ids
4178                        seen_ids = new_seen_ids;
4179
4180                        // If seen requests have changed we need to recalculate
4181                        // all the knock requests
4182                        match this.get_current_join_requests(&seen_ids).await {
4183                            Ok(requests) => yield requests,
4184                            Err(err) => {
4185                                warn!("Failed to get updated knock requests on seen ids changed: {err}")
4186                            }
4187                        }
4188                    }
4189
4190                    Some(room_info) = room_info_stream.next() => {
4191                        // We need to emit new items when we may have missing room members:
4192                        // this usually happens after a gappy (limited) sync
4193                        if !room_info.are_members_synced() {
4194                            match this.get_current_join_requests(&seen_ids).await {
4195                                Ok(requests) => yield requests,
4196                                Err(err) => {
4197                                    warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4198                                }
4199                            }
4200                        }
4201                    }
4202                    // If the streams in all branches are closed, stop the loop
4203                    else => break,
4204                }
4205            }
4206        };
4207
4208        Ok((combined_stream, clear_seen_ids_handle))
4209    }
4210
4211    async fn get_current_join_requests(
4212        &self,
4213        seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4214    ) -> Result<Vec<KnockRequest>> {
4215        Ok(self
4216            .members(RoomMemberships::KNOCK)
4217            .await?
4218            .into_iter()
4219            .filter_map(|member| {
4220                let event_id = member.event().event_id()?;
4221                Some(KnockRequest::new(
4222                    self,
4223                    event_id,
4224                    member.event().timestamp(),
4225                    KnockRequestMemberInfo::from_member(&member),
4226                    seen_request_ids.contains_key(event_id),
4227                ))
4228            })
4229            .collect())
4230    }
4231
4232    /// Access the room settings related to privacy and visibility.
4233    pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4234        RoomPrivacySettings::new(&self.inner, &self.client)
4235    }
4236
4237    /// Retrieve a list of all the threads for the current room.
4238    ///
4239    /// Since this client-server API is paginated, the return type may include a
4240    /// token used to resuming back-pagination into the list of results, in
4241    /// [`ThreadRoots::prev_batch_token`]. This token can be fed back into
4242    /// [`ListThreadsOptions::from`] to continue the pagination
4243    /// from the previous position.
4244    pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4245        let request = opts.into_request(self.room_id());
4246
4247        let response = self.client.send(request).await?;
4248
4249        let push_ctx = self.push_context().await?;
4250        let chunk = join_all(
4251            response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4252        )
4253        .await;
4254
4255        Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4256    }
4257
4258    /// Retrieve a list of relations for the given event, according to the given
4259    /// options, using the network.
4260    ///
4261    /// Since this client-server API is paginated, the return type may include a
4262    /// token used to resuming back-pagination into the list of results, in
4263    /// [`Relations::prev_batch_token`]. This token can be fed back into
4264    /// [`RelationsOptions::from`] to continue the pagination from the previous
4265    /// position.
4266    ///
4267    /// **Note**: if [`RelationsOptions::from`] is set for a subsequent request,
4268    /// then it must be used with the same
4269    /// [`RelationsOptions::include_relations`] value as the request that
4270    /// returns the `from` token, otherwise the server behavior is undefined.
4271    pub async fn relations(
4272        &self,
4273        event_id: OwnedEventId,
4274        opts: RelationsOptions,
4275    ) -> Result<Relations> {
4276        let relations = opts.send(self, event_id).await;
4277
4278        // Save any new related events to the cache.
4279        if let Ok(Relations { chunk, .. }) = &relations
4280            && let Ok((cache, _handles)) = self.event_cache().await
4281        {
4282            cache.save_events(chunk.clone()).await;
4283        }
4284
4285        relations
4286    }
4287
4288    /// Search this room's [`RoomIndex`] for query and return at most
4289    /// max_number_of_results results.
4290    #[cfg(feature = "experimental-search")]
4291    pub async fn search(
4292        &self,
4293        query: &str,
4294        max_number_of_results: usize,
4295        pagination_offset: Option<usize>,
4296    ) -> Result<Vec<OwnedEventId>, IndexError> {
4297        let mut search_index_guard = self.client.search_index().lock().await;
4298        search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4299    }
4300
4301    /// Subscribe to a given thread in this room.
4302    ///
4303    /// This will subscribe the user to the thread, so that they will receive
4304    /// notifications for that thread specifically.
4305    ///
4306    /// # Arguments
4307    ///
4308    /// - `thread_root`: The ID of the thread root event to subscribe to.
4309    /// - `automatic`: Whether the subscription was made automatically by a
4310    ///   client, not by manual user choice. If set, must include the latest
4311    ///   event ID that's known in the thread and that is causing the automatic
4312    ///   subscription. If unset (i.e. we're now subscribing manually) and there
4313    ///   was a previous automatic subscription, the subscription will be
4314    ///   overridden to a manual one instead.
4315    ///
4316    /// # Returns
4317    ///
4318    /// - A 404 error if the event isn't known, or isn't a thread root.
4319    /// - An `Ok` result if the subscription was successful, or if the server
4320    ///   skipped an automatic subscription (as the user unsubscribed from the
4321    ///   thread after the event causing the automatic subscription).
4322    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4323    pub async fn subscribe_thread(
4324        &self,
4325        thread_root: OwnedEventId,
4326        automatic: Option<OwnedEventId>,
4327    ) -> Result<()> {
4328        let is_automatic = automatic.is_some();
4329
4330        match self
4331            .client
4332            .send(subscribe_thread::unstable::Request::new(
4333                self.room_id().to_owned(),
4334                thread_root.clone(),
4335                automatic,
4336            ))
4337            .await
4338        {
4339            Ok(_response) => {
4340                trace!("Server acknowledged the thread subscription; saving in db");
4341
4342                // Immediately save the result into the database.
4343                self.client
4344                    .state_store()
4345                    .upsert_thread_subscriptions(vec![(
4346                        self.room_id(),
4347                        &thread_root,
4348                        StoredThreadSubscription {
4349                            status: ThreadSubscriptionStatus::Subscribed {
4350                                automatic: is_automatic,
4351                            },
4352                            bump_stamp: None,
4353                        },
4354                    )])
4355                    .await?;
4356
4357                Ok(())
4358            }
4359
4360            Err(err) => {
4361                if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4362                    // In this case: the server indicates that the user unsubscribed *after* the
4363                    // event ID we've used in an automatic subscription; don't
4364                    // save the subscription state in the database, as the
4365                    // previous one should be more correct.
4366                    trace!("Thread subscription skipped: {err}");
4367                    Ok(())
4368                } else {
4369                    // Forward the error to the caller.
4370                    Err(err.into())
4371                }
4372            }
4373        }
4374    }
4375
4376    /// Subscribe to a thread if needed, based on a current subscription to it.
4377    ///
4378    /// This is like [`Self::subscribe_thread`], but it first checks if the user
4379    /// has already subscribed to a thread, so as to minimize sending
4380    /// unnecessary subscriptions which would be ignored by the server.
4381    pub async fn subscribe_thread_if_needed(
4382        &self,
4383        thread_root: &EventId,
4384        automatic: Option<OwnedEventId>,
4385    ) -> Result<()> {
4386        if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4387            // If we have a previous subscription, we should only send the new one if it's
4388            // manual and the previous one was automatic.
4389            if !prev_sub.automatic || automatic.is_some() {
4390                // Either we had already a manual subscription, or we had an automatic one and
4391                // the new one is automatic too: nothing to do!
4392                return Ok(());
4393            }
4394        }
4395        self.subscribe_thread(thread_root.to_owned(), automatic).await
4396    }
4397
4398    /// Unsubscribe from a given thread in this room.
4399    ///
4400    /// # Arguments
4401    ///
4402    /// - `thread_root`: The ID of the thread root event to unsubscribe to.
4403    ///
4404    /// # Returns
4405    ///
4406    /// - An `Ok` result if the unsubscription was successful, or the thread was
4407    ///   already unsubscribed.
4408    /// - A 404 error if the event isn't known, or isn't a thread root.
4409    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4410    pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4411        self.client
4412            .send(unsubscribe_thread::unstable::Request::new(
4413                self.room_id().to_owned(),
4414                thread_root.clone(),
4415            ))
4416            .await?;
4417
4418        trace!("Server acknowledged the thread subscription removal; removed it from db too");
4419
4420        // Immediately save the result into the database.
4421        self.client
4422            .state_store()
4423            .upsert_thread_subscriptions(vec![(
4424                self.room_id(),
4425                &thread_root,
4426                StoredThreadSubscription {
4427                    status: ThreadSubscriptionStatus::Unsubscribed,
4428                    bump_stamp: None,
4429                },
4430            )])
4431            .await?;
4432
4433        Ok(())
4434    }
4435
4436    /// Return the current thread subscription for the given thread root in this
4437    /// room.
4438    ///
4439    /// # Arguments
4440    ///
4441    /// - `thread_root`: The ID of the thread root event to get the subscription
4442    ///   for.
4443    ///
4444    /// # Returns
4445    ///
4446    /// - An `Ok` result with `Some(ThreadSubscription)` if we have some
4447    ///   subscription information.
4448    /// - An `Ok` result with `None` if the subscription does not exist, or the
4449    ///   event couldn't be found, or the event isn't a thread.
4450    /// - An error if the request fails for any other reason, such as a network
4451    ///   error.
4452    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4453    pub async fn fetch_thread_subscription(
4454        &self,
4455        thread_root: OwnedEventId,
4456    ) -> Result<Option<ThreadSubscription>> {
4457        let result = self
4458            .client
4459            .send(get_thread_subscription::unstable::Request::new(
4460                self.room_id().to_owned(),
4461                thread_root.clone(),
4462            ))
4463            .await;
4464
4465        let subscription = match result {
4466            Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4467            Err(http_error) => match http_error.as_client_api_error() {
4468                Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4469                _ => return Err(http_error.into()),
4470            },
4471        };
4472
4473        // Keep the database in sync.
4474        if let Some(sub) = &subscription {
4475            self.client
4476                .state_store()
4477                .upsert_thread_subscriptions(vec![(
4478                    self.room_id(),
4479                    &thread_root,
4480                    StoredThreadSubscription {
4481                        status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4482                        bump_stamp: None,
4483                    },
4484                )])
4485                .await?;
4486        } else {
4487            // If the subscription was not found, remove it from the database.
4488            self.client
4489                .state_store()
4490                .remove_thread_subscription(self.room_id(), &thread_root)
4491                .await?;
4492        }
4493
4494        Ok(subscription)
4495    }
4496
4497    /// Return the current thread subscription for the given thread root in this
4498    /// room, by getting it from storage if possible, or fetching it from
4499    /// network otherwise.
4500    ///
4501    /// See also [`Self::fetch_thread_subscription`] for the exact semantics of
4502    /// this method.
4503    pub async fn load_or_fetch_thread_subscription(
4504        &self,
4505        thread_root: &EventId,
4506    ) -> Result<Option<ThreadSubscription>> {
4507        // If the thread subscriptions list is outdated, fetch from the server.
4508        if self.client.thread_subscription_catchup().is_outdated() {
4509            return self.fetch_thread_subscription(thread_root.to_owned()).await;
4510        }
4511
4512        // Otherwise, we can rely on the store information.
4513        Ok(self
4514            .client
4515            .state_store()
4516            .load_thread_subscription(self.room_id(), thread_root)
4517            .await
4518            .map(|maybe_sub| {
4519                maybe_sub.and_then(|stored| match stored.status {
4520                    ThreadSubscriptionStatus::Unsubscribed => None,
4521                    ThreadSubscriptionStatus::Subscribed { automatic } => {
4522                        Some(ThreadSubscription { automatic })
4523                    }
4524                })
4525            })?)
4526    }
4527
4528    /// Adds a new pinned event by sending an updated `m.room.pinned_events`
4529    /// event containing the new event id.
4530    ///
4531    /// This method will first try to get the pinned events from the current
4532    /// room's state and if it fails to do so it'll try to load them from the
4533    /// homeserver.
4534    ///
4535    /// Returns `true` if we pinned the event, `false` if the event was already
4536    /// pinned.
4537    pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
4538        let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4539            event_ids
4540        } else {
4541            self.load_pinned_events().await?.unwrap_or_default()
4542        };
4543        let event_id = event_id.to_owned();
4544        if pinned_event_ids.contains(&event_id) {
4545            Ok(false)
4546        } else {
4547            pinned_event_ids.push(event_id);
4548            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4549            self.send_state_event(content).await?;
4550            Ok(true)
4551        }
4552    }
4553
4554    /// Removes a pinned event by sending an updated `m.room.pinned_events`
4555    /// event without the event id we want to remove.
4556    ///
4557    /// This method will first try to get the pinned events from the current
4558    /// room's state and if it fails to do so it'll try to load them from the
4559    /// homeserver.
4560    ///
4561    /// Returns `true` if we unpinned the event, `false` if the event wasn't
4562    /// pinned before.
4563    pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
4564        let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4565            event_ids
4566        } else {
4567            self.load_pinned_events().await?.unwrap_or_default()
4568        };
4569        let event_id = event_id.to_owned();
4570        if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
4571            pinned_event_ids.remove(idx);
4572            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4573            self.send_state_event(content).await?;
4574            Ok(true)
4575        } else {
4576            Ok(false)
4577        }
4578    }
4579}
4580
4581#[cfg(feature = "e2e-encryption")]
4582impl RoomIdentityProvider for Room {
4583    fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4584        Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4585    }
4586
4587    fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4588        Box::pin(async {
4589            let members = self
4590                .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4591                .await
4592                .unwrap_or_else(|_| Default::default());
4593
4594            let mut ret: Vec<UserIdentity> = Vec::new();
4595            for member in members {
4596                if let Some(i) = self.user_identity(member.user_id()).await {
4597                    ret.push(i);
4598                }
4599            }
4600            ret
4601        })
4602    }
4603
4604    fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4605        Box::pin(async {
4606            self.client
4607                .encryption()
4608                .get_user_identity(user_id)
4609                .await
4610                .unwrap_or(None)
4611                .map(|u| u.underlying_identity())
4612        })
4613    }
4614}
4615
4616/// A wrapper for a weak client and a room id that allows to lazily retrieve a
4617/// room, only when needed.
4618#[derive(Clone, Debug)]
4619pub(crate) struct WeakRoom {
4620    client: WeakClient,
4621    room_id: OwnedRoomId,
4622}
4623
4624impl WeakRoom {
4625    /// Create a new `WeakRoom` given its weak components.
4626    pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4627        Self { client, room_id }
4628    }
4629
4630    /// Attempts to reconstruct the room.
4631    pub fn get(&self) -> Option<Room> {
4632        self.client.get().and_then(|client| client.get_room(&self.room_id))
4633    }
4634
4635    /// The room id for that room.
4636    pub fn room_id(&self) -> &RoomId {
4637        &self.room_id
4638    }
4639}
4640
4641/// Details of the (latest) invite.
4642#[derive(Debug, Clone)]
4643pub struct Invite {
4644    /// Who has been invited.
4645    pub invitee: RoomMember,
4646
4647    /// The user ID of who sent the invite.
4648    ///
4649    /// This is useful if `Self::inviter` is `None`.
4650    pub inviter_id: OwnedUserId,
4651
4652    /// Who sent the invite.
4653    ///
4654    /// If `None`, check `Self::inviter_id`, it might be useful as a fallback.
4655    pub inviter: Option<RoomMember>,
4656}
4657
4658#[derive(Error, Debug)]
4659enum InvitationError {
4660    #[error("No membership event found")]
4661    EventMissing,
4662}
4663
4664/// Receipts to send all at once.
4665#[derive(Debug, Clone, Default)]
4666#[non_exhaustive]
4667pub struct Receipts {
4668    /// Fully-read marker (room account data).
4669    pub fully_read: Option<OwnedEventId>,
4670    /// Read receipt (public ephemeral room event).
4671    pub public_read_receipt: Option<OwnedEventId>,
4672    /// Read receipt (private ephemeral room event).
4673    pub private_read_receipt: Option<OwnedEventId>,
4674}
4675
4676impl Receipts {
4677    /// Create an empty `Receipts`.
4678    pub fn new() -> Self {
4679        Self::default()
4680    }
4681
4682    /// Set the last event the user has read.
4683    ///
4684    /// It means that the user has read all the events before this event.
4685    ///
4686    /// This is a private marker only visible by the user.
4687    ///
4688    /// Note that this is technically not a receipt as it is persisted in the
4689    /// room account data.
4690    pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4691        self.fully_read = event_id.into();
4692        self
4693    }
4694
4695    /// Set the last event presented to the user and forward it to the other
4696    /// users in the room.
4697    ///
4698    /// This is used to reset the unread messages/notification count and
4699    /// advertise to other users the last event that the user has likely seen.
4700    pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4701        self.public_read_receipt = event_id.into();
4702        self
4703    }
4704
4705    /// Set the last event presented to the user and don't forward it.
4706    ///
4707    /// This is used to reset the unread messages/notification count.
4708    pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4709        self.private_read_receipt = event_id.into();
4710        self
4711    }
4712
4713    /// Whether this `Receipts` is empty.
4714    pub fn is_empty(&self) -> bool {
4715        self.fully_read.is_none()
4716            && self.public_read_receipt.is_none()
4717            && self.private_read_receipt.is_none()
4718    }
4719}
4720
4721/// [Parent space](https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships)
4722/// listed by a room, possibly validated by checking the space's state.
4723#[derive(Debug)]
4724pub enum ParentSpace {
4725    /// The room recognizes the given room as its parent, and the parent
4726    /// recognizes it as its child.
4727    Reciprocal(Room),
4728    /// The room recognizes the given room as its parent, but the parent does
4729    /// not recognizes it as its child. However, the author of the
4730    /// `m.space.parent` event in the room has a sufficient power level in the
4731    /// parent to create the child event.
4732    WithPowerlevel(Room),
4733    /// The room recognizes the given room as its parent, but the parent does
4734    /// not recognizes it as its child.
4735    Illegitimate(Room),
4736    /// The room recognizes the given id as its parent room, but we cannot check
4737    /// whether the parent recognizes it as its child.
4738    Unverifiable(OwnedRoomId),
4739}
4740
4741trait EventSource {
4742    fn get_event(
4743        &self,
4744        event_id: &EventId,
4745    ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4746}
4747
4748impl EventSource for &Room {
4749    async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4750        self.load_or_fetch_event(event_id, None).await
4751    }
4752}
4753
4754/// Contains the current user's room member info and the optional room member
4755/// info of the sender of the `m.room.member` event that this info represents.
4756#[derive(Debug)]
4757pub struct RoomMemberWithSenderInfo {
4758    /// The actual room member.
4759    pub room_member: RoomMember,
4760    /// The info of the sender of the event `room_member` is based on, if
4761    /// available.
4762    pub sender_info: Option<RoomMember>,
4763}
4764
4765#[cfg(all(test, not(target_family = "wasm")))]
4766mod tests {
4767    use std::collections::BTreeMap;
4768
4769    use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4770    use matrix_sdk_test::{
4771        JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
4772    };
4773    use ruma::{
4774        RoomVersionId, event_id,
4775        events::{relation::RelationType, room::member::MembershipState},
4776        owned_event_id, room_id, user_id,
4777    };
4778    use wiremock::{
4779        Mock, MockServer, ResponseTemplate,
4780        matchers::{header, method, path_regex},
4781    };
4782
4783    use crate::{
4784        Client,
4785        config::RequestConfig,
4786        room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4787        test_utils::{
4788            client::mock_matrix_session,
4789            logged_in_client,
4790            mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4791        },
4792    };
4793
4794    #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4795    #[async_test]
4796    async fn test_cache_invalidation_while_encrypt() {
4797        use matrix_sdk_base::store::RoomLoadSettings;
4798        use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4799
4800        let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4801        let session = mock_matrix_session();
4802
4803        let client = Client::builder()
4804            .homeserver_url("http://localhost:1234")
4805            .request_config(RequestConfig::new().disable_retry())
4806            .sqlite_store(&sqlite_path, None)
4807            .build()
4808            .await
4809            .unwrap();
4810        client
4811            .matrix_auth()
4812            .restore_session(session.clone(), RoomLoadSettings::default())
4813            .await
4814            .unwrap();
4815
4816        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4817
4818        // Mock receiving an event to create an internal room.
4819        let server = MockServer::start().await;
4820        {
4821            Mock::given(method("GET"))
4822                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4823                .and(header("authorization", "Bearer 1234"))
4824                .respond_with(
4825                    ResponseTemplate::new(200)
4826                        .set_body_json(EventFactory::new().room_encryption().into_content()),
4827                )
4828                .mount(&server)
4829                .await;
4830            let f = EventFactory::new().sender(user_id!("@example:localhost"));
4831            let response = SyncResponseBuilder::default()
4832                .add_joined_room(
4833                    JoinedRoomBuilder::default()
4834                        .add_state_event(
4835                            f.member(user_id!("@example:localhost")).display_name("example"),
4836                        )
4837                        .add_state_event(f.default_power_levels())
4838                        .add_state_event(f.room_encryption()),
4839                )
4840                .build_sync_response();
4841            client.base_client().receive_sync_response(response).await.unwrap();
4842        }
4843
4844        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4845
4846        // Step 1, preshare the room keys.
4847        room.preshare_room_key().await.unwrap();
4848
4849        // Step 2, force lock invalidation by pretending another client obtained the
4850        // lock.
4851        {
4852            let client = Client::builder()
4853                .homeserver_url("http://localhost:1234")
4854                .request_config(RequestConfig::new().disable_retry())
4855                .sqlite_store(&sqlite_path, None)
4856                .build()
4857                .await
4858                .unwrap();
4859            client
4860                .matrix_auth()
4861                .restore_session(session.clone(), RoomLoadSettings::default())
4862                .await
4863                .unwrap();
4864            client
4865                .encryption()
4866                .enable_cross_process_store_lock("client2".to_owned())
4867                .await
4868                .unwrap();
4869
4870            let guard = client.encryption().spin_lock_store(None).await.unwrap();
4871            assert!(guard.is_some());
4872        }
4873
4874        // Step 3, take the crypto-store lock.
4875        let guard = client.encryption().spin_lock_store(None).await.unwrap();
4876        assert!(guard.is_some());
4877
4878        // Step 4, try to encrypt a message.
4879        let olm = client.olm_machine().await;
4880        let olm = olm.as_ref().expect("Olm machine wasn't started");
4881
4882        // Now pretend we're encrypting an event; the olm machine shouldn't rely on
4883        // caching the outgoing session before.
4884        let _encrypted_content = olm
4885            .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4886            .await
4887            .unwrap();
4888    }
4889
4890    #[async_test]
4891    async fn test_composer_draft() {
4892        use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4893
4894        let client = logged_in_client(None).await;
4895
4896        let response = SyncResponseBuilder::default()
4897            .add_joined_room(JoinedRoomBuilder::default())
4898            .build_sync_response();
4899        client.base_client().receive_sync_response(response).await.unwrap();
4900        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4901
4902        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4903
4904        // Save 2 drafts, one for the room and one for a thread.
4905
4906        let draft = ComposerDraft {
4907            plain_text: "Hello, world!".to_owned(),
4908            html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4909            draft_type: ComposerDraftType::NewMessage,
4910            attachments: vec![DraftAttachment {
4911                filename: "cat.txt".to_owned(),
4912                content: matrix_sdk_base::DraftAttachmentContent::File {
4913                    data: b"meow".to_vec(),
4914                    mimetype: Some("text/plain".to_owned()),
4915                    size: Some(5),
4916                },
4917            }],
4918        };
4919
4920        room.save_composer_draft(draft.clone(), None).await.unwrap();
4921
4922        let thread_root = owned_event_id!("$thread_root:b.c");
4923        let thread_draft = ComposerDraft {
4924            plain_text: "Hello, thread!".to_owned(),
4925            html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4926            draft_type: ComposerDraftType::NewMessage,
4927            attachments: vec![DraftAttachment {
4928                filename: "dog.txt".to_owned(),
4929                content: matrix_sdk_base::DraftAttachmentContent::File {
4930                    data: b"wuv".to_vec(),
4931                    mimetype: Some("text/plain".to_owned()),
4932                    size: Some(4),
4933                },
4934            }],
4935        };
4936
4937        room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4938
4939        // Check that the room draft was saved correctly
4940        assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4941
4942        // Check that the thread draft was saved correctly
4943        assert_eq!(
4944            room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4945            Some(thread_draft.clone())
4946        );
4947
4948        // Clear the room draft
4949        room.clear_composer_draft(None).await.unwrap();
4950        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4951
4952        // Check that the thread one is still there
4953        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4954
4955        // Clear the thread draft as well
4956        room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4957        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4958    }
4959
4960    #[async_test]
4961    async fn test_mark_join_requests_as_seen() {
4962        let server = MatrixMockServer::new().await;
4963        let client = server.client_builder().build().await;
4964        let event_id = event_id!("$a:b.c");
4965        let room_id = room_id!("!a:b.c");
4966        let user_id = user_id!("@alice:b.c");
4967
4968        let f = EventFactory::new().room(room_id);
4969        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4970            f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
4971        ]);
4972        let room = server.sync_room(&client, joined_room_builder).await;
4973
4974        // When loading the initial seen ids, there are none
4975        let seen_ids =
4976            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4977        assert!(seen_ids.is_empty());
4978
4979        // We mark a random event id as seen
4980        room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4981            .await
4982            .expect("Couldn't mark join request as seen");
4983
4984        // Then we can check it was successfully marked as seen
4985        let seen_ids =
4986            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4987        assert_eq!(seen_ids.len(), 1);
4988        assert_eq!(
4989            seen_ids.into_iter().next().expect("No next value"),
4990            (event_id.to_owned(), user_id.to_owned())
4991        )
4992    }
4993
4994    #[async_test]
4995    async fn test_own_room_membership_with_no_own_member_event() {
4996        let server = MatrixMockServer::new().await;
4997        let client = server.client_builder().build().await;
4998        let room_id = room_id!("!a:b.c");
4999
5000        let room = server.sync_joined_room(&client, room_id).await;
5001
5002        // Since there is no member event for the own user, the method fails.
5003        // This should never happen in an actual room.
5004        let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
5005        assert!(error.is_some());
5006    }
5007
5008    #[async_test]
5009    async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
5010        let server = MatrixMockServer::new().await;
5011        let client = server.client_builder().build().await;
5012        let room_id = room_id!("!a:b.c");
5013        let user_id = user_id!("@example:localhost");
5014
5015        let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
5016        let joined_room_builder =
5017            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5018        let room = server.sync_room(&client, joined_room_builder).await;
5019
5020        // When we load the membership details
5021        let ret = room
5022            .member_with_sender_info(client.user_id().unwrap())
5023            .await
5024            .expect("Room member info should be available");
5025
5026        // We get the member info for the current user
5027        assert_eq!(ret.room_member.event().user_id(), user_id);
5028
5029        // But there is no info for the sender
5030        assert!(ret.sender_info.is_none());
5031    }
5032
5033    #[async_test]
5034    async fn test_own_room_membership_with_own_member_event_and_own_sender() {
5035        let server = MatrixMockServer::new().await;
5036        let client = server.client_builder().build().await;
5037        let room_id = room_id!("!a:b.c");
5038        let user_id = user_id!("@example:localhost");
5039
5040        let f = EventFactory::new().room(room_id).sender(user_id);
5041        let joined_room_builder =
5042            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5043        let room = server.sync_room(&client, joined_room_builder).await;
5044
5045        // When we load the membership details
5046        let ret = room
5047            .member_with_sender_info(client.user_id().unwrap())
5048            .await
5049            .expect("Room member info should be available");
5050
5051        // We get the current user's member info
5052        assert_eq!(ret.room_member.event().user_id(), user_id);
5053
5054        // And the sender has the same info, since it's also the current user
5055        assert!(ret.sender_info.is_some());
5056        assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5057    }
5058
5059    #[async_test]
5060    async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5061        let server = MatrixMockServer::new().await;
5062        let client = server.client_builder().build().await;
5063        let room_id = room_id!("!a:b.c");
5064        let user_id = user_id!("@example:localhost");
5065        let sender_id = user_id!("@alice:b.c");
5066
5067        let f = EventFactory::new().room(room_id).sender(sender_id);
5068        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5069            f.member(user_id).into(),
5070            // The sender info comes from the sync
5071            f.member(sender_id).into(),
5072        ]);
5073        let room = server.sync_room(&client, joined_room_builder).await;
5074
5075        // When we load the membership details
5076        let ret = room
5077            .member_with_sender_info(client.user_id().unwrap())
5078            .await
5079            .expect("Room member info should be available");
5080
5081        // We get the current user's member info
5082        assert_eq!(ret.room_member.event().user_id(), user_id);
5083
5084        // And also the sender info from the events received in the sync
5085        assert!(ret.sender_info.is_some());
5086        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5087    }
5088
5089    #[async_test]
5090    async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5091        let server = MatrixMockServer::new().await;
5092        let client = server.client_builder().build().await;
5093        let room_id = room_id!("!a:b.c");
5094        let user_id = user_id!("@example:localhost");
5095        let sender_id = user_id!("@alice:b.c");
5096
5097        let f = EventFactory::new().room(room_id).sender(sender_id);
5098        let joined_room_builder =
5099            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5100        let room = server.sync_room(&client, joined_room_builder).await;
5101
5102        // We'll receive the member info through the /members endpoint
5103        server
5104            .mock_get_members()
5105            .ok(vec![f.member(sender_id).into_raw()])
5106            .mock_once()
5107            .mount()
5108            .await;
5109
5110        // We get the current user's member info
5111        let ret = room
5112            .member_with_sender_info(client.user_id().unwrap())
5113            .await
5114            .expect("Room member info should be available");
5115
5116        // We get the current user's member info
5117        assert_eq!(ret.room_member.event().user_id(), user_id);
5118
5119        // And also the sender info from the /members endpoint
5120        assert!(ret.sender_info.is_some());
5121        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5122    }
5123
5124    #[async_test]
5125    async fn test_list_threads() {
5126        let server = MatrixMockServer::new().await;
5127        let client = server.client_builder().build().await;
5128
5129        let room_id = room_id!("!a:b.c");
5130        let sender_id = user_id!("@alice:b.c");
5131        let f = EventFactory::new().room(room_id).sender(sender_id);
5132
5133        let eid1 = event_id!("$1");
5134        let eid2 = event_id!("$2");
5135        let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5136        let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5137
5138        server
5139            .mock_room_threads()
5140            .ok(batch1.clone(), Some("prev_batch".to_owned()))
5141            .mock_once()
5142            .mount()
5143            .await;
5144        server
5145            .mock_room_threads()
5146            .match_from("prev_batch")
5147            .ok(batch2, None)
5148            .mock_once()
5149            .mount()
5150            .await;
5151
5152        let room = server.sync_joined_room(&client, room_id).await;
5153        let result =
5154            room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5155        assert_eq!(result.chunk.len(), 1);
5156        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5157        assert!(result.prev_batch_token.is_some());
5158
5159        let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5160        let result = room.list_threads(opts).await.expect("Failed to list threads");
5161        assert_eq!(result.chunk.len(), 1);
5162        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5163        assert!(result.prev_batch_token.is_none());
5164    }
5165
5166    #[async_test]
5167    async fn test_relations() {
5168        let server = MatrixMockServer::new().await;
5169        let client = server.client_builder().build().await;
5170
5171        let room_id = room_id!("!a:b.c");
5172        let sender_id = user_id!("@alice:b.c");
5173        let f = EventFactory::new().room(room_id).sender(sender_id);
5174
5175        let target_event_id = owned_event_id!("$target");
5176        let eid1 = event_id!("$1");
5177        let eid2 = event_id!("$2");
5178        let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5179        let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5180
5181        server
5182            .mock_room_relations()
5183            .match_target_event(target_event_id.clone())
5184            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5185            .mock_once()
5186            .mount()
5187            .await;
5188
5189        server
5190            .mock_room_relations()
5191            .match_target_event(target_event_id.clone())
5192            .match_from("next_batch")
5193            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5194            .mock_once()
5195            .mount()
5196            .await;
5197
5198        let room = server.sync_joined_room(&client, room_id).await;
5199
5200        // Main endpoint: no relation type filtered out.
5201        let mut opts = RelationsOptions {
5202            include_relations: IncludeRelations::AllRelations,
5203            ..Default::default()
5204        };
5205        let result = room
5206            .relations(target_event_id.clone(), opts.clone())
5207            .await
5208            .expect("Failed to list relations the first time");
5209        assert_eq!(result.chunk.len(), 1);
5210        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5211        assert!(result.prev_batch_token.is_none());
5212        assert!(result.next_batch_token.is_some());
5213        assert!(result.recursion_depth.is_none());
5214
5215        opts.from = result.next_batch_token;
5216        let result = room
5217            .relations(target_event_id, opts)
5218            .await
5219            .expect("Failed to list relations the second time");
5220        assert_eq!(result.chunk.len(), 1);
5221        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5222        assert!(result.prev_batch_token.is_none());
5223        assert!(result.next_batch_token.is_none());
5224        assert!(result.recursion_depth.is_none());
5225    }
5226
5227    #[async_test]
5228    async fn test_relations_with_reltype() {
5229        let server = MatrixMockServer::new().await;
5230        let client = server.client_builder().build().await;
5231
5232        let room_id = room_id!("!a:b.c");
5233        let sender_id = user_id!("@alice:b.c");
5234        let f = EventFactory::new().room(room_id).sender(sender_id);
5235
5236        let target_event_id = owned_event_id!("$target");
5237        let eid1 = event_id!("$1");
5238        let eid2 = event_id!("$2");
5239        let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5240        let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5241
5242        server
5243            .mock_room_relations()
5244            .match_target_event(target_event_id.clone())
5245            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5246            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5247            .mock_once()
5248            .mount()
5249            .await;
5250
5251        server
5252            .mock_room_relations()
5253            .match_target_event(target_event_id.clone())
5254            .match_from("next_batch")
5255            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5256            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5257            .mock_once()
5258            .mount()
5259            .await;
5260
5261        let room = server.sync_joined_room(&client, room_id).await;
5262
5263        // Reltype-filtered endpoint, for threads \o/
5264        let mut opts = RelationsOptions {
5265            include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5266            ..Default::default()
5267        };
5268        let result = room
5269            .relations(target_event_id.clone(), opts.clone())
5270            .await
5271            .expect("Failed to list relations the first time");
5272        assert_eq!(result.chunk.len(), 1);
5273        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5274        assert!(result.prev_batch_token.is_none());
5275        assert!(result.next_batch_token.is_some());
5276        assert!(result.recursion_depth.is_none());
5277
5278        opts.from = result.next_batch_token;
5279        let result = room
5280            .relations(target_event_id, opts)
5281            .await
5282            .expect("Failed to list relations the second time");
5283        assert_eq!(result.chunk.len(), 1);
5284        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5285        assert!(result.prev_batch_token.is_none());
5286        assert!(result.next_batch_token.is_none());
5287        assert!(result.recursion_depth.is_none());
5288    }
5289
5290    #[async_test]
5291    async fn test_power_levels_computation() {
5292        let server = MatrixMockServer::new().await;
5293        let client = server.client_builder().build().await;
5294
5295        let room_id = room_id!("!a:b.c");
5296        let sender_id = client.user_id().expect("No session id");
5297        let f = EventFactory::new().room(room_id).sender(sender_id);
5298        let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5299
5300        // Computing the power levels will need these 3 state events:
5301        let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5302        let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5303        let room_member_event = f.member(sender_id).into();
5304
5305        // With only the room member event
5306        let room = server
5307            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5308            .await;
5309        let ctx = room
5310            .push_condition_room_ctx()
5311            .await
5312            .expect("Failed to get push condition context")
5313            .expect("Could not get push condition context");
5314
5315        // The internal power levels couldn't be computed
5316        assert!(ctx.power_levels.is_none());
5317
5318        // Adding the room creation event
5319        let room = server
5320            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5321            .await;
5322        let ctx = room
5323            .push_condition_room_ctx()
5324            .await
5325            .expect("Failed to get push condition context")
5326            .expect("Could not get push condition context");
5327
5328        // The internal power levels still couldn't be computed
5329        assert!(ctx.power_levels.is_none());
5330
5331        // With the room member, room creation and the power levels events
5332        let room = server
5333            .sync_room(
5334                &client,
5335                JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5336            )
5337            .await;
5338        let ctx = room
5339            .push_condition_room_ctx()
5340            .await
5341            .expect("Failed to get push condition context")
5342            .expect("Could not get push condition context");
5343
5344        // The internal power levels can finally be computed
5345        assert!(ctx.power_levels.is_some());
5346    }
5347}