matrix_sdk/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level room API
16
17use std::{
18    borrow::Borrow,
19    collections::{BTreeMap, HashMap},
20    future::Future,
21    ops::Deref,
22    sync::Arc,
23    time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30    StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{IdentityStatusChange, RoomIdentityProvider, UserIdentity};
39pub use matrix_sdk_base::store::StoredThreadSubscription;
40use matrix_sdk_base::{
41    ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
42    StateChanges, StateStoreDataKey, StateStoreDataValue,
43    deserialized_responses::{
44        RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
45    },
46    media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
47    store::{StateStoreExt, ThreadSubscriptionStatus},
48};
49#[cfg(feature = "e2e-encryption")]
50use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
51#[cfg(feature = "e2e-encryption")]
52use matrix_sdk_common::BoxFuture;
53use matrix_sdk_common::{
54    deserialized_responses::TimelineEvent,
55    executor::{JoinHandle, spawn},
56    timeout::timeout,
57};
58#[cfg(feature = "experimental-search")]
59use matrix_sdk_search::error::IndexError;
60#[cfg(feature = "experimental-search")]
61#[cfg(doc)]
62use matrix_sdk_search::index::RoomIndex;
63use mime::Mime;
64use reply::Reply;
65#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
66use ruma::events::AnySyncMessageLikeEvent;
67#[cfg(feature = "experimental-encrypted-state-events")]
68use ruma::events::AnySyncStateEvent;
69#[cfg(feature = "unstable-msc4274")]
70use ruma::events::room::message::GalleryItemType;
71#[cfg(feature = "e2e-encryption")]
72use ruma::events::{
73    AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
74};
75use ruma::{
76    EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
77    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
78    api::client::{
79        config::{set_global_account_data, set_room_account_data},
80        context,
81        error::ErrorKind,
82        filter::LazyLoadOptions,
83        membership::{
84            Invite3pid, ban_user, forget_room, get_member_events,
85            invite_user::{self, v3::InvitationRecipient},
86            kick_user, leave_room, unban_user,
87        },
88        message::send_message_event,
89        read_marker::set_read_marker,
90        receipt::create_receipt,
91        redact::redact_event,
92        room::{get_room_event, report_content, report_room},
93        state::{get_state_event_for_key, send_state_event},
94        tag::{create_tag, delete_tag},
95        threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
96        typing::create_typing_event::{self, v3::Typing},
97    },
98    assign,
99    events::{
100        AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
101        Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
102        RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
103        RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
104        StaticStateEventContent, SyncStateEvent,
105        beacon::BeaconEventContent,
106        beacon_info::BeaconInfoEventContent,
107        direct::DirectEventContent,
108        marked_unread::MarkedUnreadEventContent,
109        receipt::{Receipt, ReceiptThread, ReceiptType},
110        room::{
111            ImageInfo, MediaSource, ThumbnailInfo,
112            avatar::{self, RoomAvatarEventContent},
113            encryption::RoomEncryptionEventContent,
114            history_visibility::HistoryVisibility,
115            member::{MembershipChange, SyncRoomMemberEvent},
116            message::{
117                AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
118                ImageMessageEventContent, MessageType, RoomMessageEventContent,
119                TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
120                UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
121            },
122            name::RoomNameEventContent,
123            pinned_events::RoomPinnedEventsEventContent,
124            power_levels::{
125                RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
126            },
127            server_acl::RoomServerAclEventContent,
128            topic::RoomTopicEventContent,
129        },
130        space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
131        tag::{TagInfo, TagName},
132        typing::SyncTypingEvent,
133    },
134    int,
135    push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
136    serde::Raw,
137    time::Instant,
138};
139#[cfg(feature = "experimental-encrypted-state-events")]
140use ruma::{
141    events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
142    serde::JsonCastable,
143};
144use serde::de::DeserializeOwned;
145use thiserror::Error;
146use tokio::{join, sync::broadcast};
147use tracing::{debug, error, info, instrument, trace, warn};
148
149use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
150pub use self::{
151    member::{RoomMember, RoomMemberRole},
152    messages::{
153        EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
154        Relations, RelationsOptions, ThreadRoots,
155    },
156};
157#[cfg(doc)]
158use crate::event_cache::EventCache;
159#[cfg(feature = "experimental-encrypted-state-events")]
160use crate::room::futures::{SendRawStateEvent, SendStateEvent};
161use crate::{
162    BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
163    attachment::{AttachmentConfig, AttachmentInfo},
164    client::WeakClient,
165    config::RequestConfig,
166    error::{BeaconError, WrongRoomState},
167    event_cache::{self, EventCacheDropHandles, RoomEventCache},
168    event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
169    live_location_share::ObservableLiveLocation,
170    media::{MediaFormat, MediaRequestParameters},
171    notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
172    room::{
173        knock_requests::{KnockRequest, KnockRequestMemberInfo},
174        power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
175        privacy_settings::RoomPrivacySettings,
176    },
177    sync::RoomUpdate,
178    utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
179};
180#[cfg(feature = "e2e-encryption")]
181use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
182
183pub mod edit;
184pub mod futures;
185pub mod identity_status_changes;
186/// Contains code related to requests to join a room.
187pub mod knock_requests;
188mod member;
189mod messages;
190pub mod power_levels;
191pub mod reply;
192
193pub mod calls;
194
195/// Contains all the functionality for modifying the privacy settings in a room.
196pub mod privacy_settings;
197
198#[cfg(feature = "e2e-encryption")]
199pub(crate) mod shared_room_history;
200
201/// A struct containing methods that are common for Joined, Invited and Left
202/// Rooms
203#[derive(Debug, Clone)]
204pub struct Room {
205    inner: BaseRoom,
206    pub(crate) client: Client,
207}
208
209impl Deref for Room {
210    type Target = BaseRoom;
211
212    fn deref(&self) -> &Self::Target {
213        &self.inner
214    }
215}
216
217const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
218const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
219
220/// A thread subscription, according to the semantics of MSC4306.
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub struct ThreadSubscription {
223    /// Whether the subscription was made automatically by a client, not by
224    /// manual user choice.
225    pub automatic: bool,
226}
227
228/// Context allowing to compute the push actions for a given event.
229#[derive(Debug)]
230pub struct PushContext {
231    /// The Ruma context used to compute the push actions.
232    push_condition_room_ctx: PushConditionRoomCtx,
233
234    /// Push rules for this room, based on the push rules state event, or the
235    /// global server default as defined by [`Ruleset::server_default`].
236    push_rules: Ruleset,
237}
238
239impl PushContext {
240    /// Create a new [`PushContext`] from its inner components.
241    pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
242        Self { push_condition_room_ctx, push_rules }
243    }
244
245    /// Compute the push rules for a given event.
246    pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
247        self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
248    }
249
250    /// Compute the push rules for a given event, with extra logging to help
251    /// debugging.
252    #[doc(hidden)]
253    #[instrument(skip_all)]
254    pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
255        let rules = self
256            .push_rules
257            .iter()
258            .filter_map(|r| {
259                if !r.enabled() {
260                    return None;
261                }
262
263                let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
264
265                let conditions = match r {
266                    AnyPushRuleRef::Override(r) => {
267                        format!("{:?}", r.conditions)
268                    }
269                    AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
270                    AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
271                    AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
272                    AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
273                    _ => "<unknown push rule kind>".to_owned(),
274                };
275
276                Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
277            })
278            .collect::<Vec<_>>()
279            .join("\n");
280        trace!("rules:\n\n{rules}\n\n");
281
282        let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
283
284        if let Some(found) = found {
285            trace!("rule {} matched", found.rule_id());
286            found.actions().to_owned()
287        } else {
288            trace!("no match");
289            Vec::new()
290        }
291    }
292}
293
294macro_rules! make_media_type {
295    ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
296        // If caption is set, use it as body, and filename as the file name; otherwise,
297        // body is the filename, and the filename is not set.
298        // https://github.com/matrix-org/matrix-spec-proposals/blob/main/proposals/2530-body-as-caption.md
299        let (body, formatted, filename) = match $caption {
300            Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
301            None => ($filename, None, None),
302        };
303
304        let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
305
306        match $content_type.type_() {
307            mime::IMAGE => {
308                let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
309                    mimetype: Some($content_type.as_ref().to_owned()),
310                    thumbnail_source,
311                    thumbnail_info
312                });
313                let content = assign!(ImageMessageEventContent::new(body, $source), {
314                    info: Some(Box::new(info)),
315                    formatted,
316                    filename
317                });
318                <$t>::Image(content)
319            }
320
321            mime::AUDIO => {
322                let mut content = assign!(AudioMessageEventContent::new(body, $source), {
323                    formatted,
324                    filename
325                });
326
327                if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
328                 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
329                    let waveform = waveform_vec
330                        .iter()
331                        .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
332                        .map(Into::into)
333                        .collect();
334                    content.audio =
335                        Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
336                }
337
338                if matches!($info, Some(AttachmentInfo::Voice(_))) {
339                    content.voice = Some(UnstableVoiceContentBlock::new());
340                }
341
342                let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
343                audio_info.mimetype = Some($content_type.as_ref().to_owned());
344                let content = content.info(Box::new(audio_info));
345
346                <$t>::Audio(content)
347            }
348
349            mime::VIDEO => {
350                let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
351                    mimetype: Some($content_type.as_ref().to_owned()),
352                    thumbnail_source,
353                    thumbnail_info
354                });
355                let content = assign!(VideoMessageEventContent::new(body, $source), {
356                    info: Some(Box::new(info)),
357                    formatted,
358                    filename
359                });
360                <$t>::Video(content)
361            }
362
363            _ => {
364                let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
365                    mimetype: Some($content_type.as_ref().to_owned()),
366                    thumbnail_source,
367                    thumbnail_info
368                });
369                let content = assign!(FileMessageEventContent::new(body, $source), {
370                    info: Some(Box::new(info)),
371                    formatted,
372                    filename,
373                });
374                <$t>::File(content)
375            }
376        }
377    }};
378}
379
380impl Room {
381    /// Create a new `Room`
382    ///
383    /// # Arguments
384    /// * `client` - The client used to make requests.
385    ///
386    /// * `room` - The underlying room.
387    pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
388        Self { inner: room, client }
389    }
390
391    /// Leave this room.
392    /// If the room was in [`RoomState::Invited`] state, it'll also be forgotten
393    /// automatically.
394    ///
395    /// Only invited and joined rooms can be left.
396    #[doc(alias = "reject_invitation")]
397    #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
398    async fn leave_impl(&self) -> (Result<()>, &Room) {
399        let state = self.state();
400        if state == RoomState::Left {
401            return (
402                Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
403                    "Joined or Invited",
404                    state,
405                )))),
406                self,
407            );
408        }
409
410        // If the room was in Invited state we should also forget it when declining the
411        // invite.
412        let should_forget = matches!(self.state(), RoomState::Invited);
413
414        let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
415        let response = self.client.send(request).await;
416
417        // The server can return with an error that is acceptable to ignore. Let's find
418        // which one.
419        if let Err(error) = response {
420            #[allow(clippy::collapsible_match)]
421            let ignore_error = if let Some(error) = error.client_api_error_kind() {
422                match error {
423                    // The user is trying to leave a room but doesn't have permissions to do so.
424                    // Let's consider the user has left the room.
425                    ErrorKind::Forbidden { .. } => true,
426                    _ => false,
427                }
428            } else {
429                false
430            };
431
432            error!(?error, ignore_error, should_forget, "Failed to leave the room");
433
434            if !ignore_error {
435                return (Err(error.into()), self);
436            }
437        }
438
439        if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
440            return (Err(e.into()), self);
441        }
442
443        if should_forget {
444            trace!("Trying to forget the room");
445
446            if let Err(error) = self.forget().await {
447                error!(?error, "Failed to forget the room");
448            }
449        }
450
451        (Ok(()), self)
452    }
453
454    /// Leave this room and all predecessors.
455    /// If any room was in [`RoomState::Invited`] state, it'll also be forgotten
456    /// automatically.
457    ///
458    /// Only invited and joined rooms can be left.
459    /// Will return an error if the current room fails to leave but
460    /// will only warn if a predecessor fails to leave.
461    pub async fn leave(&self) -> Result<()> {
462        let mut rooms: Vec<Room> = vec![self.clone()];
463        let mut current_room = self;
464
465        while let Some(predecessor) = current_room.predecessor_room() {
466            let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
467
468            if let Some(predecessor_room) = maybe_predecessor_room {
469                rooms.push(predecessor_room.clone());
470                current_room = rooms.last().expect("Room just pushed so can't be empty");
471            } else {
472                warn!("Cannot find predecessor room");
473                break;
474            }
475        }
476
477        let batch_size = 5;
478
479        let rooms_futures: Vec<_> = rooms
480            .iter()
481            .filter_map(|room| match room.state() {
482                RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
483                    Some(room.leave_impl())
484                }
485                RoomState::Banned | RoomState::Left => None,
486            })
487            .collect();
488
489        let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
490
491        let mut maybe_this_room_failed_with: Option<Error> = None;
492
493        while let Some(result) = futures_stream.next().await {
494            if let (Err(e), room) = result {
495                if room.room_id() == self.room_id() {
496                    maybe_this_room_failed_with = Some(e);
497                } else {
498                    warn!("Failure while attempting to leave predecessor room: {e:?}");
499                }
500            }
501        }
502
503        maybe_this_room_failed_with.map_or(Ok(()), Err)
504    }
505
506    /// Join this room.
507    ///
508    /// Only invited and left rooms can be joined via this method.
509    #[doc(alias = "accept_invitation")]
510    pub async fn join(&self) -> Result<()> {
511        let prev_room_state = self.inner.state();
512
513        if prev_room_state == RoomState::Joined {
514            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
515                "Invited or Left",
516                prev_room_state,
517            ))));
518        }
519
520        self.client.join_room_by_id(self.room_id()).await?;
521
522        Ok(())
523    }
524
525    /// Get the inner client saved in this room instance.
526    ///
527    /// Returns the client this room is part of.
528    pub fn client(&self) -> Client {
529        self.client.clone()
530    }
531
532    /// Get the sync state of this room, i.e. whether it was fully synced with
533    /// the server.
534    pub fn is_synced(&self) -> bool {
535        self.inner.is_state_fully_synced()
536    }
537
538    /// Gets the avatar of this room, if set.
539    ///
540    /// Returns the avatar.
541    /// If a thumbnail is requested no guarantee on the size of the image is
542    /// given.
543    ///
544    /// # Arguments
545    ///
546    /// * `format` - The desired format of the avatar.
547    ///
548    /// # Examples
549    ///
550    /// ```no_run
551    /// # use matrix_sdk::Client;
552    /// # use matrix_sdk::ruma::room_id;
553    /// # use matrix_sdk::media::MediaFormat;
554    /// # use url::Url;
555    /// # let homeserver = Url::parse("http://example.com").unwrap();
556    /// # async {
557    /// # let user = "example";
558    /// let client = Client::new(homeserver).await.unwrap();
559    /// client.matrix_auth().login_username(user, "password").send().await.unwrap();
560    /// let room_id = room_id!("!roomid:example.com");
561    /// let room = client.get_room(&room_id).unwrap();
562    /// if let Some(avatar) = room.avatar(MediaFormat::File).await.unwrap() {
563    ///     std::fs::write("avatar.png", avatar);
564    /// }
565    /// # };
566    /// ```
567    pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
568        let Some(url) = self.avatar_url() else { return Ok(None) };
569        let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
570        Ok(Some(self.client.media().get_media_content(&request, true).await?))
571    }
572
573    /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
574    /// returns a `Messages` struct that contains a chunk of room and state
575    /// events (`RoomEvent` and `AnyStateEvent`).
576    ///
577    /// With the encryption feature, messages are decrypted if possible. If
578    /// decryption fails for an individual message, that message is returned
579    /// undecrypted.
580    ///
581    /// # Examples
582    ///
583    /// ```no_run
584    /// use matrix_sdk::{Client, room::MessagesOptions};
585    /// # use matrix_sdk::ruma::{
586    /// #     api::client::filter::RoomEventFilter,
587    /// #     room_id,
588    /// # };
589    /// # use url::Url;
590    ///
591    /// # let homeserver = Url::parse("http://example.com").unwrap();
592    /// # async {
593    /// let options =
594    ///     MessagesOptions::backward().from("t47429-4392820_219380_26003_2265");
595    ///
596    /// let mut client = Client::new(homeserver).await.unwrap();
597    /// let room = client.get_room(room_id!("!roomid:example.com")).unwrap();
598    /// assert!(room.messages(options).await.is_ok());
599    /// # };
600    /// ```
601    #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
602    pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
603        let room_id = self.inner.room_id();
604        let request = options.into_request(room_id);
605        let http_response = self.client.send(request).await?;
606
607        let push_ctx = self.push_context().await?;
608        let chunk = join_all(
609            http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
610        )
611        .await;
612
613        Ok(Messages {
614            start: http_response.start,
615            end: http_response.end,
616            chunk,
617            state: http_response.state,
618        })
619    }
620
621    /// Register a handler for events of a specific type, within this room.
622    ///
623    /// This method works the same way as [`Client::add_event_handler`], except
624    /// that the handler will only be called for events within this room. See
625    /// that method for more details on event handler functions.
626    ///
627    /// `room.add_event_handler(hdl)` is equivalent to
628    /// `client.add_room_event_handler(room_id, hdl)`. Use whichever one is more
629    /// convenient in your use case.
630    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
631    where
632        Ev: SyncEvent + DeserializeOwned + Send + 'static,
633        H: EventHandler<Ev, Ctx>,
634    {
635        self.client.add_room_event_handler(self.room_id(), handler)
636    }
637
638    /// Subscribe to all updates for this room.
639    ///
640    /// The returned receiver will receive a new message for each sync response
641    /// that contains updates for this room.
642    pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
643        self.client.subscribe_to_room_updates(self.room_id())
644    }
645
646    /// Subscribe to typing notifications for this room.
647    ///
648    /// The returned receiver will receive a new vector of user IDs for each
649    /// sync response that contains 'm.typing' event. The current user ID will
650    /// be filtered out.
651    pub fn subscribe_to_typing_notifications(
652        &self,
653    ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
654        let (sender, receiver) = broadcast::channel(16);
655        let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
656            let own_user_id = self.own_user_id().to_owned();
657            move |event: SyncTypingEvent| async move {
658                // Ignore typing notifications from own user.
659                let typing_user_ids = event
660                    .content
661                    .user_ids
662                    .into_iter()
663                    .filter(|user_id| *user_id != own_user_id)
664                    .collect();
665                // Ignore the result. It can only fail if there are no listeners.
666                let _ = sender.send(typing_user_ids);
667            }
668        });
669        let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
670        (drop_guard, receiver)
671    }
672
673    /// Subscribe to updates about users who are in "pin violation" i.e. their
674    /// identity has changed and the user has not yet acknowledged this.
675    ///
676    /// The returned receiver will receive a new vector of
677    /// [`IdentityStatusChange`] each time a /keys/query response shows a
678    /// changed identity for a member of this room, or a sync shows a change
679    /// to the membership of an affected user. (Changes to the current user are
680    /// not directly included, but some changes to the current user's identity
681    /// can trigger changes to how we see other users' identities, which
682    /// will be included.)
683    ///
684    /// The first item in the stream provides the current state of the room:
685    /// each member of the room who is not in "pinned" or "verified" state will
686    /// be included (except the current user).
687    ///
688    /// If the `changed_to` property of an [`IdentityStatusChange`] is set to
689    /// `PinViolation` then a warning should be displayed to the user. If it is
690    /// set to `Pinned` then no warning should be displayed.
691    ///
692    /// Note that if a user who is in pin violation leaves the room, a `Pinned`
693    /// update is sent, to indicate that the warning should be removed, even
694    /// though the user's identity is not necessarily pinned.
695    #[cfg(feature = "e2e-encryption")]
696    pub async fn subscribe_to_identity_status_changes(
697        &self,
698    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
699        IdentityStatusChanges::create_stream(self.clone()).await
700    }
701
702    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
703    /// decrypted if needs be.
704    ///
705    /// Only logs from the crypto crate will indicate a failure to decrypt.
706    #[cfg(not(feature = "experimental-encrypted-state-events"))]
707    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
708    async fn try_decrypt_event(
709        &self,
710        event: Raw<AnyTimelineEvent>,
711        push_ctx: Option<&PushContext>,
712    ) -> TimelineEvent {
713        #[cfg(feature = "e2e-encryption")]
714        if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
715            SyncMessageLikeEvent::Original(_),
716        ))) = event.deserialize_as::<AnySyncTimelineEvent>()
717            && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
718        {
719            return event;
720        }
721
722        let mut event = TimelineEvent::from_plaintext(event.cast());
723        if let Some(push_ctx) = push_ctx {
724            event.set_push_actions(push_ctx.for_event(event.raw()).await);
725        }
726
727        event
728    }
729
730    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
731    /// decrypted if needs be.
732    ///
733    /// Only logs from the crypto crate will indicate a failure to decrypt.
734    #[cfg(feature = "experimental-encrypted-state-events")]
735    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
736    async fn try_decrypt_event(
737        &self,
738        event: Raw<AnyTimelineEvent>,
739        push_ctx: Option<&PushContext>,
740    ) -> TimelineEvent {
741        // If we have either an encrypted message-like or state event, try to decrypt.
742        match event.deserialize_as::<AnySyncTimelineEvent>() {
743            Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
744                SyncMessageLikeEvent::Original(_),
745            ))) => {
746                if let Ok(event) = self
747                    .decrypt_event(
748                        event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
749                        push_ctx,
750                    )
751                    .await
752                {
753                    return event;
754                }
755            }
756            Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
757                SyncStateEvent::Original(_),
758            ))) => {
759                if let Ok(event) = self
760                    .decrypt_event(
761                        event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
762                        push_ctx,
763                    )
764                    .await
765                {
766                    return event;
767                }
768            }
769            _ => {}
770        }
771
772        let mut event = TimelineEvent::from_plaintext(event.cast());
773        if let Some(push_ctx) = push_ctx {
774            event.set_push_actions(push_ctx.for_event(event.raw()).await);
775        }
776
777        event
778    }
779
780    /// Fetch the event with the given `EventId` in this room.
781    ///
782    /// It uses the given [`RequestConfig`] if provided, or the client's default
783    /// one otherwise.
784    pub async fn event(
785        &self,
786        event_id: &EventId,
787        request_config: Option<RequestConfig>,
788    ) -> Result<TimelineEvent> {
789        let request =
790            get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
791
792        let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
793        let push_ctx = self.push_context().await?;
794        let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
795
796        // Save the event into the event cache, if it's set up.
797        if let Ok((cache, _handles)) = self.event_cache().await {
798            cache.save_events([event.clone()]).await;
799        }
800
801        Ok(event)
802    }
803
804    /// Try to load the event from the [`EventCache`][crate::event_cache], if
805    /// it's enabled, or fetch it from the homeserver.
806    ///
807    /// When running the request against the homeserver, it uses the given
808    /// [`RequestConfig`] if provided, or the client's default one
809    /// otherwise.
810    pub async fn load_or_fetch_event(
811        &self,
812        event_id: &EventId,
813        request_config: Option<RequestConfig>,
814    ) -> Result<TimelineEvent> {
815        match self.event_cache().await {
816            Ok((event_cache, _drop_handles)) => {
817                if let Some(event) = event_cache.find_event(event_id).await {
818                    return Ok(event);
819                }
820                // Fallthrough: try with a request.
821            }
822            Err(err) => {
823                debug!("error when getting the event cache: {err}");
824            }
825        }
826        self.event(event_id, request_config).await
827    }
828
829    /// Fetch the event with the given `EventId` in this room, using the
830    /// `/context` endpoint to get more information.
831    pub async fn event_with_context(
832        &self,
833        event_id: &EventId,
834        lazy_load_members: bool,
835        context_size: UInt,
836        request_config: Option<RequestConfig>,
837    ) -> Result<EventWithContextResponse> {
838        let mut request =
839            context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
840
841        request.limit = context_size;
842
843        if lazy_load_members {
844            request.filter.lazy_load_options =
845                LazyLoadOptions::Enabled { include_redundant_members: false };
846        }
847
848        let response = self.client.send(request).with_request_config(request_config).await?;
849
850        let push_ctx = self.push_context().await?;
851        let push_ctx = push_ctx.as_ref();
852        let target_event = if let Some(event) = response.event {
853            Some(self.try_decrypt_event(event, push_ctx).await)
854        } else {
855            None
856        };
857
858        // Note: the joined future will fail if any future failed, but
859        // [`Self::try_decrypt_event`] doesn't hard-fail when there's a
860        // decryption error, so we should prevent against most bad cases here.
861        let (events_before, events_after) = join!(
862            join_all(
863                response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
864            ),
865            join_all(
866                response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
867            ),
868        );
869
870        // Save the loaded events into the event cache, if it's set up.
871        if let Ok((cache, _handles)) = self.event_cache().await {
872            let mut events_to_save: Vec<TimelineEvent> = Vec::new();
873            if let Some(event) = &target_event {
874                events_to_save.push(event.clone());
875            }
876
877            for event in &events_before {
878                events_to_save.push(event.clone());
879            }
880
881            for event in &events_after {
882                events_to_save.push(event.clone());
883            }
884
885            cache.save_events(events_to_save).await;
886        }
887
888        Ok(EventWithContextResponse {
889            event: target_event,
890            events_before,
891            events_after,
892            state: response.state,
893            prev_batch_token: response.start,
894            next_batch_token: response.end,
895        })
896    }
897
898    pub(crate) async fn request_members(&self) -> Result<()> {
899        self.client
900            .locks()
901            .members_request_deduplicated_handler
902            .run(self.room_id().to_owned(), async move {
903                let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
904                let response = self
905                    .client
906                    .send(request.clone())
907                    .with_request_config(
908                        // In some cases it can take longer than 30s to load:
909                        // https://github.com/element-hq/synapse/issues/16872
910                        RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
911                    )
912                    .await?;
913
914                // That's a large `Future`. Let's `Box::pin` to reduce its size on the stack.
915                Box::pin(self.client.base_client().receive_all_members(
916                    self.room_id(),
917                    &request,
918                    &response,
919                ))
920                .await?;
921
922                Ok(())
923            })
924            .await
925    }
926
927    /// Request to update the encryption state for this room.
928    ///
929    /// It does nothing if the encryption state is already
930    /// [`EncryptionState::Encrypted`] or [`EncryptionState::NotEncrypted`].
931    pub async fn request_encryption_state(&self) -> Result<()> {
932        if !self.inner.encryption_state().is_unknown() {
933            return Ok(());
934        }
935
936        self.client
937            .locks()
938            .encryption_state_deduplicated_handler
939            .run(self.room_id().to_owned(), async move {
940                // Request the event from the server.
941                let request = get_state_event_for_key::v3::Request::new(
942                    self.room_id().to_owned(),
943                    StateEventType::RoomEncryption,
944                    "".to_owned(),
945                );
946                let response = match self.client.send(request).await {
947                    Ok(response) => Some(
948                        response
949                            .into_content()
950                            .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
951                    ),
952                    Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
953                    Err(err) => return Err(err.into()),
954                };
955
956                let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
957
958                // Persist the event and the fact that we requested it from the server in
959                // `RoomInfo`.
960                let mut room_info = self.clone_info();
961                room_info.mark_encryption_state_synced();
962                room_info.set_encryption_event(response.clone());
963                let mut changes = StateChanges::default();
964                changes.add_room(room_info.clone());
965
966                self.client.state_store().save_changes(&changes).await?;
967                self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
968
969                Ok(())
970            })
971            .await
972    }
973
974    /// Check the encryption state of this room.
975    ///
976    /// If the result is [`EncryptionState::Unknown`], one might want to call
977    /// [`Room::request_encryption_state`].
978    pub fn encryption_state(&self) -> EncryptionState {
979        self.inner.encryption_state()
980    }
981
982    /// Force to update the encryption state by calling
983    /// [`Room::request_encryption_state`], and then calling
984    /// [`Room::encryption_state`].
985    ///
986    /// This method is useful to ensure the encryption state is up-to-date.
987    pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
988        self.request_encryption_state().await?;
989
990        Ok(self.encryption_state())
991    }
992
993    /// Gets additional context info about the client crypto.
994    #[cfg(feature = "e2e-encryption")]
995    pub async fn crypto_context_info(&self) -> CryptoContextInfo {
996        let encryption = self.client.encryption();
997
998        let this_device_is_verified = match encryption.get_own_device().await {
999            Ok(Some(device)) => device.is_verified_with_cross_signing(),
1000
1001            // Should not happen, there will always be an own device
1002            _ => true,
1003        };
1004
1005        let backup_exists_on_server =
1006            encryption.backups().exists_on_server().await.unwrap_or(false);
1007
1008        CryptoContextInfo {
1009            device_creation_ts: encryption.device_creation_timestamp().await,
1010            this_device_is_verified,
1011            is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1012            backup_exists_on_server,
1013        }
1014    }
1015
1016    fn are_events_visible(&self) -> bool {
1017        if let RoomState::Invited = self.inner.state() {
1018            return matches!(
1019                self.inner.history_visibility_or_default(),
1020                HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1021            );
1022        }
1023
1024        true
1025    }
1026
1027    /// Sync the member list with the server.
1028    ///
1029    /// This method will de-duplicate requests if it is called multiple times in
1030    /// quick succession, in that case the return value will be `None`. This
1031    /// method does nothing if the members are already synced.
1032    pub async fn sync_members(&self) -> Result<()> {
1033        if !self.are_events_visible() {
1034            return Ok(());
1035        }
1036
1037        if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1038    }
1039
1040    /// Get a specific member of this room.
1041    ///
1042    /// *Note*: This method will fetch the members from the homeserver if the
1043    /// member list isn't synchronized due to member lazy loading. Because of
1044    /// that it might panic if it isn't run on a tokio thread.
1045    ///
1046    /// Use [get_member_no_sync()](#method.get_member_no_sync) if you want a
1047    /// method that doesn't do any requests.
1048    ///
1049    /// # Arguments
1050    ///
1051    /// * `user_id` - The ID of the user that should be fetched out of the
1052    ///   store.
1053    pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1054        self.sync_members().await?;
1055        self.get_member_no_sync(user_id).await
1056    }
1057
1058    /// Get a specific member of this room.
1059    ///
1060    /// *Note*: This method will not fetch the members from the homeserver if
1061    /// the member list isn't synchronized due to member lazy loading. Thus,
1062    /// members could be missing.
1063    ///
1064    /// Use [get_member()](#method.get_member) if you want to ensure to always
1065    /// have the full member list to chose from.
1066    ///
1067    /// # Arguments
1068    ///
1069    /// * `user_id` - The ID of the user that should be fetched out of the
1070    ///   store.
1071    pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1072        Ok(self
1073            .inner
1074            .get_member(user_id)
1075            .await?
1076            .map(|member| RoomMember::new(self.client.clone(), member)))
1077    }
1078
1079    /// Get members for this room, with the given memberships.
1080    ///
1081    /// *Note*: This method will fetch the members from the homeserver if the
1082    /// member list isn't synchronized due to member lazy loading. Because of
1083    /// that it might panic if it isn't run on a tokio thread.
1084    ///
1085    /// Use [members_no_sync()](#method.members_no_sync) if you want a
1086    /// method that doesn't do any requests.
1087    pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1088        self.sync_members().await?;
1089        self.members_no_sync(memberships).await
1090    }
1091
1092    /// Get members for this room, with the given memberships.
1093    ///
1094    /// *Note*: This method will not fetch the members from the homeserver if
1095    /// the member list isn't synchronized due to member lazy loading. Thus,
1096    /// members could be missing.
1097    ///
1098    /// Use [members()](#method.members) if you want to ensure to always get
1099    /// the full member list.
1100    pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1101        Ok(self
1102            .inner
1103            .members(memberships)
1104            .await?
1105            .into_iter()
1106            .map(|member| RoomMember::new(self.client.clone(), member))
1107            .collect())
1108    }
1109
1110    /// Get all state events of a given type in this room.
1111    pub async fn get_state_events(
1112        &self,
1113        event_type: StateEventType,
1114    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1115        self.client
1116            .state_store()
1117            .get_state_events(self.room_id(), event_type)
1118            .await
1119            .map_err(Into::into)
1120    }
1121
1122    /// Get all state events of a given statically-known type in this room.
1123    ///
1124    /// # Examples
1125    ///
1126    /// ```no_run
1127    /// # async {
1128    /// # let room: matrix_sdk::Room = todo!();
1129    /// use matrix_sdk::ruma::{
1130    ///     events::room::member::RoomMemberEventContent, serde::Raw,
1131    /// };
1132    ///
1133    /// let room_members =
1134    ///     room.get_state_events_static::<RoomMemberEventContent>().await?;
1135    /// # anyhow::Ok(())
1136    /// # };
1137    /// ```
1138    pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1139    where
1140        C: StaticEventContent<IsPrefix = ruma::events::False>
1141            + StaticStateEventContent
1142            + RedactContent,
1143        C::Redacted: RedactedStateEventContent,
1144    {
1145        Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1146    }
1147
1148    /// Get the state events of a given type with the given state keys in this
1149    /// room.
1150    pub async fn get_state_events_for_keys(
1151        &self,
1152        event_type: StateEventType,
1153        state_keys: &[&str],
1154    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1155        self.client
1156            .state_store()
1157            .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1158            .await
1159            .map_err(Into::into)
1160    }
1161
1162    /// Get the state events of a given statically-known type with the given
1163    /// state keys in this room.
1164    ///
1165    /// # Examples
1166    ///
1167    /// ```no_run
1168    /// # async {
1169    /// # let room: matrix_sdk::Room = todo!();
1170    /// # let user_ids: &[matrix_sdk::ruma::OwnedUserId] = &[];
1171    /// use matrix_sdk::ruma::events::room::member::RoomMemberEventContent;
1172    ///
1173    /// let room_members = room
1174    ///     .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
1175    ///         user_ids,
1176    ///     )
1177    ///     .await?;
1178    /// # anyhow::Ok(())
1179    /// # };
1180    /// ```
1181    pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1182        &self,
1183        state_keys: I,
1184    ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1185    where
1186        C: StaticEventContent<IsPrefix = ruma::events::False>
1187            + StaticStateEventContent
1188            + RedactContent,
1189        C::StateKey: Borrow<K>,
1190        C::Redacted: RedactedStateEventContent,
1191        K: AsRef<str> + Sized + Sync + 'a,
1192        I: IntoIterator<Item = &'a K> + Send,
1193        I::IntoIter: Send,
1194    {
1195        Ok(self
1196            .client
1197            .state_store()
1198            .get_state_events_for_keys_static(self.room_id(), state_keys)
1199            .await?)
1200    }
1201
1202    /// Get a specific state event in this room.
1203    pub async fn get_state_event(
1204        &self,
1205        event_type: StateEventType,
1206        state_key: &str,
1207    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1208        self.client
1209            .state_store()
1210            .get_state_event(self.room_id(), event_type, state_key)
1211            .await
1212            .map_err(Into::into)
1213    }
1214
1215    /// Get a specific state event of statically-known type with an empty state
1216    /// key in this room.
1217    ///
1218    /// # Examples
1219    ///
1220    /// ```no_run
1221    /// # async {
1222    /// # let room: matrix_sdk::Room = todo!();
1223    /// use matrix_sdk::ruma::events::room::power_levels::RoomPowerLevelsEventContent;
1224    ///
1225    /// let power_levels = room
1226    ///     .get_state_event_static::<RoomPowerLevelsEventContent>()
1227    ///     .await?
1228    ///     .expect("every room has a power_levels event")
1229    ///     .deserialize()?;
1230    /// # anyhow::Ok(())
1231    /// # };
1232    /// ```
1233    pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1234    where
1235        C: StaticEventContent<IsPrefix = ruma::events::False>
1236            + StaticStateEventContent<StateKey = EmptyStateKey>
1237            + RedactContent,
1238        C::Redacted: RedactedStateEventContent,
1239    {
1240        self.get_state_event_static_for_key(&EmptyStateKey).await
1241    }
1242
1243    /// Get a specific state event of statically-known type in this room.
1244    ///
1245    /// # Examples
1246    ///
1247    /// ```no_run
1248    /// # async {
1249    /// # let room: matrix_sdk::Room = todo!();
1250    /// use matrix_sdk::ruma::{
1251    ///     events::room::member::RoomMemberEventContent, serde::Raw, user_id,
1252    /// };
1253    ///
1254    /// let member_event = room
1255    ///     .get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id!(
1256    ///         "@alice:example.org"
1257    ///     ))
1258    ///     .await?;
1259    /// # anyhow::Ok(())
1260    /// # };
1261    /// ```
1262    pub async fn get_state_event_static_for_key<C, K>(
1263        &self,
1264        state_key: &K,
1265    ) -> Result<Option<RawSyncOrStrippedState<C>>>
1266    where
1267        C: StaticEventContent<IsPrefix = ruma::events::False>
1268            + StaticStateEventContent
1269            + RedactContent,
1270        C::StateKey: Borrow<K>,
1271        C::Redacted: RedactedStateEventContent,
1272        K: AsRef<str> + ?Sized + Sync,
1273    {
1274        Ok(self
1275            .client
1276            .state_store()
1277            .get_state_event_static_for_key(self.room_id(), state_key)
1278            .await?)
1279    }
1280
1281    /// Returns the parents this room advertises as its parents.
1282    ///
1283    /// Results are in no particular order.
1284    pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1285        // Implements this algorithm:
1286        // https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships
1287
1288        // Get all m.space.parent events for this room
1289        Ok(self
1290            .get_state_events_static::<SpaceParentEventContent>()
1291            .await?
1292            .into_iter()
1293            // Extract state key (ie. the parent's id) and sender
1294            .filter_map(|parent_event| match parent_event.deserialize() {
1295                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1296                    Some((e.state_key.to_owned(), e.sender))
1297                }
1298                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1299                Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1300                Err(e) => {
1301                    info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1302                    None
1303                }
1304            })
1305            // Check whether the parent recognizes this room as its child
1306            .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1307                let Some(parent_room) = self.client.get_room(&state_key) else {
1308                    // We are not in the room, cannot check if the relationship is reciprocal
1309                    // TODO: try peeking into the room
1310                    return Ok(ParentSpace::Unverifiable(state_key));
1311                };
1312                // Get the m.space.child state of the parent with this room's id
1313                // as state key.
1314                if let Some(child_event) = parent_room
1315                    .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1316                    .await?
1317                {
1318                    match child_event.deserialize() {
1319                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1320                            // There is a valid m.space.child in the parent pointing to
1321                            // this room
1322                            return Ok(ParentSpace::Reciprocal(parent_room));
1323                        }
1324                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1325                        Ok(SyncOrStrippedState::Stripped(_)) => {}
1326                        Err(e) => {
1327                            info!(
1328                                room_id = ?self.room_id(), parent_room_id = ?state_key,
1329                                "Could not deserialize m.space.child: {e}"
1330                            );
1331                        }
1332                    }
1333                    // Otherwise the event is either invalid or redacted. If
1334                    // redacted it would be missing the
1335                    // `via` key, thereby invalidating that end of the
1336                    // relationship: https://spec.matrix.org/v1.8/client-server-api/#mspacechild
1337                }
1338
1339                // No reciprocal m.space.child found, let's check if the sender has the
1340                // power to set it
1341                let Some(member) = parent_room.get_member(&sender).await? else {
1342                    // Sender is not even in the parent room
1343                    return Ok(ParentSpace::Illegitimate(parent_room));
1344                };
1345
1346                if member.can_send_state(StateEventType::SpaceChild) {
1347                    // Sender does have the power to set m.room.child
1348                    Ok(ParentSpace::WithPowerlevel(parent_room))
1349                } else {
1350                    Ok(ParentSpace::Illegitimate(parent_room))
1351                }
1352            })
1353            .collect::<FuturesUnordered<_>>())
1354    }
1355
1356    /// Read account data in this room, from storage.
1357    pub async fn account_data(
1358        &self,
1359        data_type: RoomAccountDataEventType,
1360    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1361        self.client
1362            .state_store()
1363            .get_room_account_data_event(self.room_id(), data_type)
1364            .await
1365            .map_err(Into::into)
1366    }
1367
1368    /// Get account data of a statically-known type in this room, from storage.
1369    ///
1370    /// # Examples
1371    ///
1372    /// ```no_run
1373    /// # async {
1374    /// # let room: matrix_sdk::Room = todo!();
1375    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1376    ///
1377    /// match room.account_data_static::<FullyReadEventContent>().await? {
1378    ///     Some(fully_read) => {
1379    ///         println!("Found read marker: {:?}", fully_read.deserialize()?)
1380    ///     }
1381    ///     None => println!("No read marker for this room"),
1382    /// }
1383    /// # anyhow::Ok(())
1384    /// # };
1385    /// ```
1386    pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1387    where
1388        C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1389    {
1390        Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1391    }
1392
1393    /// Check if all members of this room are verified and all their devices are
1394    /// verified.
1395    ///
1396    /// Returns true if all devices in the room are verified, otherwise false.
1397    #[cfg(feature = "e2e-encryption")]
1398    pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1399        let user_ids = self
1400            .client
1401            .state_store()
1402            .get_user_ids(self.room_id(), RoomMemberships::empty())
1403            .await?;
1404
1405        for user_id in user_ids {
1406            let devices = self.client.encryption().get_user_devices(&user_id).await?;
1407            let any_unverified = devices.devices().any(|d| !d.is_verified());
1408
1409            if any_unverified {
1410                return Ok(false);
1411            }
1412        }
1413
1414        Ok(true)
1415    }
1416
1417    /// Set the given account data event for this room.
1418    ///
1419    /// # Example
1420    /// ```
1421    /// # async {
1422    /// # let room: matrix_sdk::Room = todo!();
1423    /// # let event_id: ruma::OwnedEventId = todo!();
1424    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1425    /// let content = FullyReadEventContent::new(event_id);
1426    ///
1427    /// room.set_account_data(content).await?;
1428    /// # anyhow::Ok(())
1429    /// # };
1430    /// ```
1431    pub async fn set_account_data<T>(
1432        &self,
1433        content: T,
1434    ) -> Result<set_room_account_data::v3::Response>
1435    where
1436        T: RoomAccountDataEventContent,
1437    {
1438        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1439
1440        let request = set_room_account_data::v3::Request::new(
1441            own_user.to_owned(),
1442            self.room_id().to_owned(),
1443            &content,
1444        )?;
1445
1446        Ok(self.client.send(request).await?)
1447    }
1448
1449    /// Set the given raw account data event in this room.
1450    ///
1451    /// # Example
1452    /// ```
1453    /// # async {
1454    /// # let room: matrix_sdk::Room = todo!();
1455    /// use matrix_sdk::ruma::{
1456    ///     events::{
1457    ///         AnyRoomAccountDataEventContent, RoomAccountDataEventContent,
1458    ///         marked_unread::MarkedUnreadEventContent,
1459    ///     },
1460    ///     serde::Raw,
1461    /// };
1462    /// let marked_unread_content = MarkedUnreadEventContent::new(true);
1463    /// let full_event: AnyRoomAccountDataEventContent =
1464    ///     marked_unread_content.clone().into();
1465    /// room.set_account_data_raw(
1466    ///     marked_unread_content.event_type(),
1467    ///     Raw::new(&full_event).unwrap(),
1468    /// )
1469    /// .await?;
1470    /// # anyhow::Ok(())
1471    /// # };
1472    /// ```
1473    pub async fn set_account_data_raw(
1474        &self,
1475        event_type: RoomAccountDataEventType,
1476        content: Raw<AnyRoomAccountDataEventContent>,
1477    ) -> Result<set_room_account_data::v3::Response> {
1478        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1479
1480        let request = set_room_account_data::v3::Request::new_raw(
1481            own_user.to_owned(),
1482            self.room_id().to_owned(),
1483            event_type,
1484            content,
1485        );
1486
1487        Ok(self.client.send(request).await?)
1488    }
1489
1490    /// Adds a tag to the room, or updates it if it already exists.
1491    ///
1492    /// Returns the [`create_tag::v3::Response`] from the server.
1493    ///
1494    /// # Arguments
1495    /// * `tag` - The tag to add or update.
1496    ///
1497    /// * `tag_info` - Information about the tag, generally containing the
1498    ///   `order` parameter.
1499    ///
1500    /// # Examples
1501    ///
1502    /// ```no_run
1503    /// # use std::str::FromStr;
1504    /// # use ruma::events::tag::{TagInfo, TagName, UserTagName};
1505    /// # async {
1506    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
1507    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
1508    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
1509    /// use matrix_sdk::ruma::events::tag::TagInfo;
1510    ///
1511    /// if let Some(room) = client.get_room(&room_id) {
1512    ///     let mut tag_info = TagInfo::new();
1513    ///     tag_info.order = Some(0.9);
1514    ///     let user_tag = UserTagName::from_str("u.work")?;
1515    ///
1516    ///     room.set_tag(TagName::User(user_tag), tag_info).await?;
1517    /// }
1518    /// # anyhow::Ok(()) };
1519    /// ```
1520    pub async fn set_tag(
1521        &self,
1522        tag: TagName,
1523        tag_info: TagInfo,
1524    ) -> Result<create_tag::v3::Response> {
1525        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1526        let request = create_tag::v3::Request::new(
1527            user_id.to_owned(),
1528            self.inner.room_id().to_owned(),
1529            tag.to_string(),
1530            tag_info,
1531        );
1532        Ok(self.client.send(request).await?)
1533    }
1534
1535    /// Removes a tag from the room.
1536    ///
1537    /// Returns the [`delete_tag::v3::Response`] from the server.
1538    ///
1539    /// # Arguments
1540    /// * `tag` - The tag to remove.
1541    pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1542        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1543        let request = delete_tag::v3::Request::new(
1544            user_id.to_owned(),
1545            self.inner.room_id().to_owned(),
1546            tag.to_string(),
1547        );
1548        Ok(self.client.send(request).await?)
1549    }
1550
1551    /// Add or remove the `m.favourite` flag for this room.
1552    ///
1553    /// If `is_favourite` is `true`, and the `m.low_priority` tag is set on the
1554    /// room, the tag will be removed too.
1555    ///
1556    /// # Arguments
1557    ///
1558    /// * `is_favourite` - Whether to mark this room as favourite.
1559    /// * `tag_order` - The order of the tag if any.
1560    pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1561        if is_favourite {
1562            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1563
1564            self.set_tag(TagName::Favorite, tag_info).await?;
1565
1566            if self.is_low_priority() {
1567                self.remove_tag(TagName::LowPriority).await?;
1568            }
1569        } else {
1570            self.remove_tag(TagName::Favorite).await?;
1571        }
1572        Ok(())
1573    }
1574
1575    /// Add or remove the `m.lowpriority` flag for this room.
1576    ///
1577    /// If `is_low_priority` is `true`, and the `m.favourite` tag is set on the
1578    /// room, the tag will be removed too.
1579    ///
1580    /// # Arguments
1581    ///
1582    /// * `is_low_priority` - Whether to mark this room as low_priority or not.
1583    /// * `tag_order` - The order of the tag if any.
1584    pub async fn set_is_low_priority(
1585        &self,
1586        is_low_priority: bool,
1587        tag_order: Option<f64>,
1588    ) -> Result<()> {
1589        if is_low_priority {
1590            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1591
1592            self.set_tag(TagName::LowPriority, tag_info).await?;
1593
1594            if self.is_favourite() {
1595                self.remove_tag(TagName::Favorite).await?;
1596            }
1597        } else {
1598            self.remove_tag(TagName::LowPriority).await?;
1599        }
1600        Ok(())
1601    }
1602
1603    /// Sets whether this room is a DM.
1604    ///
1605    /// When setting this room as DM, it will be marked as DM for all active
1606    /// members of the room. When unsetting this room as DM, it will be
1607    /// unmarked as DM for all users, not just the members.
1608    ///
1609    /// # Arguments
1610    /// * `is_direct` - Whether to mark this room as direct.
1611    pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1612        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1613
1614        let mut content = self
1615            .client
1616            .account()
1617            .account_data::<DirectEventContent>()
1618            .await?
1619            .map(|c| c.deserialize())
1620            .transpose()?
1621            .unwrap_or_default();
1622
1623        let this_room_id = self.inner.room_id();
1624
1625        if is_direct {
1626            let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1627            room_members.retain(|member| member.user_id() != self.own_user_id());
1628
1629            for member in room_members {
1630                let entry = content.entry(member.user_id().into()).or_default();
1631                if !entry.iter().any(|room_id| room_id == this_room_id) {
1632                    entry.push(this_room_id.to_owned());
1633                }
1634            }
1635        } else {
1636            for (_, list) in content.iter_mut() {
1637                list.retain(|room_id| *room_id != this_room_id);
1638            }
1639
1640            // Remove user ids that don't have any room marked as DM
1641            content.retain(|_, list| !list.is_empty());
1642        }
1643
1644        let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1645
1646        self.client.send(request).await?;
1647        Ok(())
1648    }
1649
1650    /// Tries to decrypt a room event.
1651    ///
1652    /// # Arguments
1653    /// * `event` - The room event to be decrypted.
1654    ///
1655    /// Returns the decrypted event. In the case of a decryption error, returns
1656    /// a `TimelineEvent` representing the decryption error.
1657    #[cfg(feature = "e2e-encryption")]
1658    #[cfg(not(feature = "experimental-encrypted-state-events"))]
1659    pub async fn decrypt_event(
1660        &self,
1661        event: &Raw<OriginalSyncRoomEncryptedEvent>,
1662        push_ctx: Option<&PushContext>,
1663    ) -> Result<TimelineEvent> {
1664        let machine = self.client.olm_machine().await;
1665        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1666
1667        match machine
1668            .try_decrypt_room_event(
1669                event.cast_ref(),
1670                self.inner.room_id(),
1671                self.client.decryption_settings(),
1672            )
1673            .await?
1674        {
1675            RoomEventDecryptionResult::Decrypted(decrypted) => {
1676                let push_actions = if let Some(push_ctx) = push_ctx {
1677                    Some(push_ctx.for_event(&decrypted.event).await)
1678                } else {
1679                    None
1680                };
1681                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1682            }
1683            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1684                self.client
1685                    .encryption()
1686                    .backups()
1687                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1688                Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1689            }
1690        }
1691    }
1692
1693    /// Tries to decrypt a room event.
1694    ///
1695    /// # Arguments
1696    /// * `event` - The room event to be decrypted.
1697    ///
1698    /// Returns the decrypted event. In the case of a decryption error, returns
1699    /// a `TimelineEvent` representing the decryption error.
1700    #[cfg(feature = "experimental-encrypted-state-events")]
1701    pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1702        &self,
1703        event: &Raw<T>,
1704        push_ctx: Option<&PushContext>,
1705    ) -> Result<TimelineEvent> {
1706        let machine = self.client.olm_machine().await;
1707        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1708
1709        match machine
1710            .try_decrypt_room_event(
1711                event.cast_ref(),
1712                self.inner.room_id(),
1713                self.client.decryption_settings(),
1714            )
1715            .await?
1716        {
1717            RoomEventDecryptionResult::Decrypted(decrypted) => {
1718                let push_actions = if let Some(push_ctx) = push_ctx {
1719                    Some(push_ctx.for_event(&decrypted.event).await)
1720                } else {
1721                    None
1722                };
1723                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1724            }
1725            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1726                self.client
1727                    .encryption()
1728                    .backups()
1729                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1730                // Cast safety: Anything that can be cast to EncryptedEvent must be a timeline
1731                // event.
1732                Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1733            }
1734        }
1735    }
1736
1737    /// Fetches the [`EncryptionInfo`] for an event decrypted with the supplied
1738    /// session_id.
1739    ///
1740    /// This may be used when we receive an update for a session, and we want to
1741    /// reflect the changes in messages we have received that were encrypted
1742    /// with that session, e.g. to remove a warning shield because a device is
1743    /// now verified.
1744    ///
1745    /// # Arguments
1746    /// * `session_id` - The ID of the Megolm session to get information for.
1747    /// * `sender` - The (claimed) sender of the event where the session was
1748    ///   used.
1749    #[cfg(feature = "e2e-encryption")]
1750    pub async fn get_encryption_info(
1751        &self,
1752        session_id: &str,
1753        sender: &UserId,
1754    ) -> Option<Arc<EncryptionInfo>> {
1755        let machine = self.client.olm_machine().await;
1756        let machine = machine.as_ref()?;
1757        machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1758    }
1759
1760    /// Forces the currently active room key, which is used to encrypt messages,
1761    /// to be rotated.
1762    ///
1763    /// A new room key will be crated and shared with all the room members the
1764    /// next time a message will be sent. You don't have to call this method,
1765    /// room keys will be rotated automatically when necessary. This method is
1766    /// still useful for debugging purposes.
1767    ///
1768    /// For more info please take a look a the [`encryption`] module
1769    /// documentation.
1770    ///
1771    /// [`encryption`]: crate::encryption
1772    #[cfg(feature = "e2e-encryption")]
1773    pub async fn discard_room_key(&self) -> Result<()> {
1774        let machine = self.client.olm_machine().await;
1775        if let Some(machine) = machine.as_ref() {
1776            machine.discard_room_key(self.inner.room_id()).await?;
1777            Ok(())
1778        } else {
1779            Err(Error::NoOlmMachine)
1780        }
1781    }
1782
1783    /// Ban the user with `UserId` from this room.
1784    ///
1785    /// # Arguments
1786    ///
1787    /// * `user_id` - The user to ban with `UserId`.
1788    ///
1789    /// * `reason` - The reason for banning this user.
1790    #[instrument(skip_all)]
1791    pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1792        let request = assign!(
1793            ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1794            { reason: reason.map(ToOwned::to_owned) }
1795        );
1796        self.client.send(request).await?;
1797        Ok(())
1798    }
1799
1800    /// Unban the user with `UserId` from this room.
1801    ///
1802    /// # Arguments
1803    ///
1804    /// * `user_id` - The user to unban with `UserId`.
1805    ///
1806    /// * `reason` - The reason for unbanning this user.
1807    #[instrument(skip_all)]
1808    pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1809        let request = assign!(
1810            unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1811            { reason: reason.map(ToOwned::to_owned) }
1812        );
1813        self.client.send(request).await?;
1814        Ok(())
1815    }
1816
1817    /// Kick a user out of this room.
1818    ///
1819    /// # Arguments
1820    ///
1821    /// * `user_id` - The `UserId` of the user that should be kicked out of the
1822    ///   room.
1823    ///
1824    /// * `reason` - Optional reason why the room member is being kicked out.
1825    #[instrument(skip_all)]
1826    pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1827        let request = assign!(
1828            kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1829            { reason: reason.map(ToOwned::to_owned) }
1830        );
1831        self.client.send(request).await?;
1832        Ok(())
1833    }
1834
1835    /// Invite the specified user by `UserId` to this room.
1836    ///
1837    /// # Arguments
1838    ///
1839    /// * `user_id` - The `UserId` of the user to invite to the room.
1840    #[instrument(skip_all)]
1841    pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1842        #[cfg(feature = "e2e-encryption")]
1843        if self.client.inner.enable_share_history_on_invite {
1844            shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1845        }
1846
1847        let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1848        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1849        self.client.send(request).await?;
1850
1851        // Force a future room members reload before sending any event to prevent UTDs
1852        // that can happen when some event is sent after a room member has been invited
1853        // but before the /sync request could fetch the membership change event.
1854        self.mark_members_missing();
1855
1856        Ok(())
1857    }
1858
1859    /// Invite the specified user by third party id to this room.
1860    ///
1861    /// # Arguments
1862    ///
1863    /// * `invite_id` - A third party id of a user to invite to the room.
1864    #[instrument(skip_all)]
1865    pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1866        let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1867        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1868        self.client.send(request).await?;
1869
1870        // Force a future room members reload before sending any event to prevent UTDs
1871        // that can happen when some event is sent after a room member has been invited
1872        // but before the /sync request could fetch the membership change event.
1873        self.mark_members_missing();
1874
1875        Ok(())
1876    }
1877
1878    /// Activate typing notice for this room.
1879    ///
1880    /// The typing notice remains active for 4s. It can be deactivate at any
1881    /// point by setting typing to `false`. If this method is called while
1882    /// the typing notice is active nothing will happen. This method can be
1883    /// called on every key stroke, since it will do nothing while typing is
1884    /// active.
1885    ///
1886    /// # Arguments
1887    ///
1888    /// * `typing` - Whether the user is typing or has stopped typing.
1889    ///
1890    /// # Examples
1891    ///
1892    /// ```no_run
1893    /// use std::time::Duration;
1894    ///
1895    /// use matrix_sdk::ruma::api::client::typing::create_typing_event::v3::Typing;
1896    /// # use matrix_sdk::{
1897    /// #     Client, config::SyncSettings,
1898    /// #     ruma::room_id,
1899    /// # };
1900    /// # use url::Url;
1901    ///
1902    /// # async {
1903    /// # let homeserver = Url::parse("http://localhost:8080")?;
1904    /// # let client = Client::new(homeserver).await?;
1905    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
1906    ///
1907    /// if let Some(room) = client.get_room(&room_id) {
1908    ///     room.typing_notice(true).await?
1909    /// }
1910    /// # anyhow::Ok(()) };
1911    /// ```
1912    pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1913        self.ensure_room_joined()?;
1914
1915        // Only send a request to the homeserver if the old timeout has elapsed
1916        // or the typing notice changed state within the `TYPING_NOTICE_TIMEOUT`
1917        let send = if let Some(typing_time) =
1918            self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1919        {
1920            if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1921                // We always reactivate the typing notice if typing is true or
1922                // we may need to deactivate it if it's
1923                // currently active if typing is false
1924                typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1925            } else {
1926                // Only send a request when we need to deactivate typing
1927                !typing
1928            }
1929        } else {
1930            // Typing notice is currently deactivated, therefore, send a request
1931            // only when it's about to be activated
1932            typing
1933        };
1934
1935        if send {
1936            self.send_typing_notice(typing).await?;
1937        }
1938
1939        Ok(())
1940    }
1941
1942    #[instrument(name = "typing_notice", skip(self))]
1943    async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1944        let typing = if typing {
1945            self.client
1946                .inner
1947                .typing_notice_times
1948                .write()
1949                .unwrap()
1950                .insert(self.room_id().to_owned(), Instant::now());
1951            Typing::Yes(TYPING_NOTICE_TIMEOUT)
1952        } else {
1953            self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1954            Typing::No
1955        };
1956
1957        let request = create_typing_event::v3::Request::new(
1958            self.own_user_id().to_owned(),
1959            self.room_id().to_owned(),
1960            typing,
1961        );
1962
1963        self.client.send(request).await?;
1964
1965        Ok(())
1966    }
1967
1968    /// Send a request to set a single receipt.
1969    ///
1970    /// If an unthreaded receipt is sent, this will also unset the unread flag
1971    /// of the room if necessary.
1972    ///
1973    /// # Arguments
1974    ///
1975    /// * `receipt_type` - The type of the receipt to set. Note that it is
1976    ///   possible to set the fully-read marker although it is technically not a
1977    ///   receipt.
1978    ///
1979    /// * `thread` - The thread where this receipt should apply, if any. Note
1980    ///   that this must be [`ReceiptThread::Unthreaded`] when sending a
1981    ///   [`ReceiptType::FullyRead`][create_receipt::v3::ReceiptType::FullyRead].
1982    ///
1983    /// * `event_id` - The `EventId` of the event to set the receipt on.
1984    #[instrument(skip_all)]
1985    pub async fn send_single_receipt(
1986        &self,
1987        receipt_type: create_receipt::v3::ReceiptType,
1988        thread: ReceiptThread,
1989        event_id: OwnedEventId,
1990    ) -> Result<()> {
1991        // Since the receipt type and the thread aren't Hash/Ord, flatten then as a
1992        // string key.
1993        let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1994
1995        self.client
1996            .inner
1997            .locks
1998            .read_receipt_deduplicated_handler
1999            .run((request_key, event_id.clone()), async {
2000                // We will unset the unread flag if we send an unthreaded receipt.
2001                let is_unthreaded = thread == ReceiptThread::Unthreaded;
2002
2003                let mut request = create_receipt::v3::Request::new(
2004                    self.room_id().to_owned(),
2005                    receipt_type,
2006                    event_id,
2007                );
2008                request.thread = thread;
2009
2010                self.client.send(request).await?;
2011
2012                if is_unthreaded {
2013                    self.set_unread_flag(false).await?;
2014                }
2015
2016                Ok(())
2017            })
2018            .await
2019    }
2020
2021    /// Send a request to set multiple receipts at once.
2022    ///
2023    /// This will also unset the unread flag of the room if necessary.
2024    ///
2025    /// # Arguments
2026    ///
2027    /// * `receipts` - The `Receipts` to send.
2028    ///
2029    /// If `receipts` is empty, this is a no-op.
2030    #[instrument(skip_all)]
2031    pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2032        if receipts.is_empty() {
2033            return Ok(());
2034        }
2035
2036        let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2037        let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2038            fully_read,
2039            read_receipt: public_read_receipt,
2040            private_read_receipt,
2041        });
2042
2043        self.client.send(request).await?;
2044
2045        self.set_unread_flag(false).await?;
2046
2047        Ok(())
2048    }
2049
2050    /// Helper function to enable End-to-end encryption in this room.
2051    /// `encrypted_state_events` is not used unless the
2052    /// `experimental-encrypted-state-events` feature is enabled.
2053    #[allow(unused_variables, unused_mut)]
2054    async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2055        use ruma::{
2056            EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2057        };
2058        const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2059
2060        if !self.latest_encryption_state().await?.is_encrypted() {
2061            let mut content =
2062                RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2063            #[cfg(feature = "experimental-encrypted-state-events")]
2064            if encrypted_state_events {
2065                content = content.with_encrypted_state();
2066            }
2067            self.send_state_event(content).await?;
2068
2069            // Spin on the sync beat event, since the first sync we receive might not
2070            // include the encryption event.
2071            //
2072            // TODO do we want to return an error here if we time out? This
2073            // could be quite useful if someone wants to enable encryption and
2074            // send a message right after it's enabled.
2075            let res = timeout(
2076                async {
2077                    loop {
2078                        // Listen for sync events, then check if the encryption state is known.
2079                        self.client.inner.sync_beat.listen().await;
2080                        let _state_store_lock =
2081                            self.client.base_client().state_store_lock().lock().await;
2082
2083                        if !self.inner.encryption_state().is_unknown() {
2084                            break;
2085                        }
2086                    }
2087                },
2088                SYNC_WAIT_TIME,
2089            )
2090            .await;
2091
2092            let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2093
2094            // If encryption was enabled, return.
2095            #[cfg(not(feature = "experimental-encrypted-state-events"))]
2096            if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2097                debug!("room successfully marked as encrypted");
2098                return Ok(());
2099            }
2100
2101            // If encryption with state event encryption was enabled, return.
2102            #[cfg(feature = "experimental-encrypted-state-events")]
2103            if res.is_ok() && {
2104                if encrypted_state_events {
2105                    self.inner.encryption_state().is_state_encrypted()
2106                } else {
2107                    self.inner.encryption_state().is_encrypted()
2108                }
2109            } {
2110                debug!("room successfully marked as encrypted");
2111                return Ok(());
2112            }
2113
2114            // If after waiting for multiple syncs, we don't have the encryption state we
2115            // expect, assume the local encryption state is incorrect; this will
2116            // cause the SDK to re-request it later for confirmation, instead of
2117            // assuming it's sync'd and correct (and not encrypted).
2118            debug!("still not marked as encrypted, marking encryption state as missing");
2119
2120            let mut room_info = self.clone_info();
2121            room_info.mark_encryption_state_missing();
2122            let mut changes = StateChanges::default();
2123            changes.add_room(room_info.clone());
2124
2125            self.client.state_store().save_changes(&changes).await?;
2126            self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2127        }
2128
2129        Ok(())
2130    }
2131
2132    /// Enable End-to-end encryption in this room.
2133    ///
2134    /// This method will be a noop if encryption is already enabled, otherwise
2135    /// sends a `m.room.encryption` state event to the room. This might fail if
2136    /// you don't have the appropriate power level to enable end-to-end
2137    /// encryption.
2138    ///
2139    /// A sync needs to be received to update the local room state. This method
2140    /// will wait for a sync to be received, this might time out if no
2141    /// sync loop is running or if the server is slow.
2142    ///
2143    /// # Examples
2144    ///
2145    /// ```no_run
2146    /// # use matrix_sdk::{
2147    /// #     Client, config::SyncSettings,
2148    /// #     ruma::room_id,
2149    /// # };
2150    /// # use url::Url;
2151    /// #
2152    /// # async {
2153    /// # let homeserver = Url::parse("http://localhost:8080")?;
2154    /// # let client = Client::new(homeserver).await?;
2155    /// # let room_id = room_id!("!test:localhost");
2156    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2157    ///
2158    /// if let Some(room) = client.get_room(&room_id) {
2159    ///     room.enable_encryption().await?
2160    /// }
2161    /// # anyhow::Ok(()) };
2162    /// ```
2163    #[instrument(skip_all)]
2164    pub async fn enable_encryption(&self) -> Result<()> {
2165        self.enable_encryption_inner(false).await
2166    }
2167
2168    /// Enable End-to-end encryption in this room, opting into experimental
2169    /// state event encryption.
2170    ///
2171    /// This method will be a noop if encryption is already enabled, otherwise
2172    /// sends a `m.room.encryption` state event to the room. This might fail if
2173    /// you don't have the appropriate power level to enable end-to-end
2174    /// encryption.
2175    ///
2176    /// A sync needs to be received to update the local room state. This method
2177    /// will wait for a sync to be received, this might time out if no
2178    /// sync loop is running or if the server is slow.
2179    ///
2180    /// # Examples
2181    ///
2182    /// ```no_run
2183    /// # use matrix_sdk::{
2184    /// #     Client, config::SyncSettings,
2185    /// #     ruma::room_id,
2186    /// # };
2187    /// # use url::Url;
2188    /// #
2189    /// # async {
2190    /// # let homeserver = Url::parse("http://localhost:8080")?;
2191    /// # let client = Client::new(homeserver).await?;
2192    /// # let room_id = room_id!("!test:localhost");
2193    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2194    ///
2195    /// if let Some(room) = client.get_room(&room_id) {
2196    ///     room.enable_encryption_with_state_event_encryption().await?
2197    /// }
2198    /// # anyhow::Ok(()) };
2199    /// ```
2200    #[instrument(skip_all)]
2201    #[cfg(feature = "experimental-encrypted-state-events")]
2202    pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2203        self.enable_encryption_inner(true).await
2204    }
2205
2206    /// Share a room key with users in the given room.
2207    ///
2208    /// This will create Olm sessions with all the users/device pairs in the
2209    /// room if necessary and share a room key that can be shared with them.
2210    ///
2211    /// Does nothing if no room key needs to be shared.
2212    // TODO: expose this publicly so people can pre-share a group session if
2213    // e.g. a user starts to type a message for a room.
2214    #[cfg(feature = "e2e-encryption")]
2215    #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2216    async fn preshare_room_key(&self) -> Result<()> {
2217        self.ensure_room_joined()?;
2218
2219        // Take and release the lock on the store, if needs be.
2220        let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2221        tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2222
2223        self.client
2224            .locks()
2225            .group_session_deduplicated_handler
2226            .run(self.room_id().to_owned(), async move {
2227                {
2228                    let members = self
2229                        .client
2230                        .state_store()
2231                        .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2232                        .await?;
2233                    self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2234                };
2235
2236                let response = self.share_room_key().await;
2237
2238                // If one of the responses failed invalidate the group
2239                // session as using it would end up in undecryptable
2240                // messages.
2241                if let Err(r) = response {
2242                    let machine = self.client.olm_machine().await;
2243                    if let Some(machine) = machine.as_ref() {
2244                        machine.discard_room_key(self.room_id()).await?;
2245                    }
2246                    return Err(r);
2247                }
2248
2249                Ok(())
2250            })
2251            .await
2252    }
2253
2254    /// Share a group session for a room.
2255    ///
2256    /// # Panics
2257    ///
2258    /// Panics if the client isn't logged in.
2259    #[cfg(feature = "e2e-encryption")]
2260    #[instrument(skip_all)]
2261    async fn share_room_key(&self) -> Result<()> {
2262        self.ensure_room_joined()?;
2263
2264        let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2265
2266        for request in requests {
2267            let response = self.client.send_to_device(&request).await?;
2268            self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2269        }
2270
2271        Ok(())
2272    }
2273
2274    /// Wait for the room to be fully synced.
2275    ///
2276    /// This method makes sure the room that was returned when joining a room
2277    /// has been echoed back in the sync.
2278    ///
2279    /// Warning: This waits until a sync happens and does not return if no sync
2280    /// is happening. It can also return early when the room is not a joined
2281    /// room anymore.
2282    #[instrument(skip_all)]
2283    pub async fn sync_up(&self) {
2284        while !self.is_synced() && self.state() == RoomState::Joined {
2285            let wait_for_beat = self.client.inner.sync_beat.listen();
2286            // We don't care whether it's a timeout or a sync beat.
2287            let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2288        }
2289    }
2290
2291    /// Send a message-like event to this room.
2292    ///
2293    /// Returns the parsed response from the server.
2294    ///
2295    /// If the encryption feature is enabled this method will transparently
2296    /// encrypt the event if this room is encrypted (except for `m.reaction`
2297    /// events, which are never encrypted).
2298    ///
2299    /// **Note**: If you just want to send an event with custom JSON content to
2300    /// a room, you can use the [`send_raw()`][Self::send_raw] method for that.
2301    ///
2302    /// If you want to set a transaction ID for the event, use
2303    /// [`.with_transaction_id()`][SendMessageLikeEvent::with_transaction_id]
2304    /// on the returned value before `.await`ing it.
2305    ///
2306    /// # Arguments
2307    ///
2308    /// * `content` - The content of the message event.
2309    ///
2310    /// # Examples
2311    ///
2312    /// ```no_run
2313    /// # use std::sync::{Arc, RwLock};
2314    /// # use matrix_sdk::{Client, config::SyncSettings};
2315    /// # use url::Url;
2316    /// # use matrix_sdk::ruma::room_id;
2317    /// # use serde::{Deserialize, Serialize};
2318    /// use matrix_sdk::ruma::{
2319    ///     MilliSecondsSinceUnixEpoch, TransactionId,
2320    ///     events::{
2321    ///         macros::EventContent,
2322    ///         room::message::{RoomMessageEventContent, TextMessageEventContent},
2323    ///     },
2324    ///     uint,
2325    /// };
2326    ///
2327    /// # async {
2328    /// # let homeserver = Url::parse("http://localhost:8080")?;
2329    /// # let mut client = Client::new(homeserver).await?;
2330    /// # let room_id = room_id!("!test:localhost");
2331    /// let content = RoomMessageEventContent::text_plain("Hello world");
2332    /// let txn_id = TransactionId::new();
2333    ///
2334    /// if let Some(room) = client.get_room(&room_id) {
2335    ///     room.send(content).with_transaction_id(txn_id).await?;
2336    /// }
2337    ///
2338    /// // Custom events work too:
2339    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2340    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
2341    /// struct TokenEventContent {
2342    ///     token: String,
2343    ///     #[serde(rename = "exp")]
2344    ///     expires_at: MilliSecondsSinceUnixEpoch,
2345    /// }
2346    ///
2347    /// # fn generate_token() -> String { todo!() }
2348    /// let content = TokenEventContent {
2349    ///     token: generate_token(),
2350    ///     expires_at: {
2351    ///         let now = MilliSecondsSinceUnixEpoch::now();
2352    ///         MilliSecondsSinceUnixEpoch(now.0 + uint!(30_000))
2353    ///     },
2354    /// };
2355    ///
2356    /// if let Some(room) = client.get_room(&room_id) {
2357    ///     room.send(content).await?;
2358    /// }
2359    /// # anyhow::Ok(()) };
2360    /// ```
2361    pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2362        SendMessageLikeEvent::new(self, content)
2363    }
2364
2365    /// Run /keys/query requests for all the non-tracked users, and for users
2366    /// with an out-of-date device list.
2367    #[cfg(feature = "e2e-encryption")]
2368    async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2369        let olm = self.client.olm_machine().await;
2370        let olm = olm.as_ref().expect("Olm machine wasn't started");
2371
2372        let members =
2373            self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2374
2375        let tracked: HashMap<_, _> = olm
2376            .store()
2377            .load_tracked_users()
2378            .await?
2379            .into_iter()
2380            .map(|tracked| (tracked.user_id, tracked.dirty))
2381            .collect();
2382
2383        // A member has no unknown devices iff it was tracked *and* the tracking is
2384        // not considered dirty.
2385        let members_with_unknown_devices =
2386            members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2387
2388        let (req_id, request) =
2389            olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2390
2391        if !request.device_keys.is_empty() {
2392            self.client.keys_query(&req_id, request.device_keys).await?;
2393        }
2394
2395        Ok(())
2396    }
2397
2398    /// Send a message-like event with custom JSON content to this room.
2399    ///
2400    /// Returns the parsed response from the server.
2401    ///
2402    /// If the encryption feature is enabled this method will transparently
2403    /// encrypt the event if this room is encrypted (except for `m.reaction`
2404    /// events, which are never encrypted).
2405    ///
2406    /// This method is equivalent to the [`send()`][Self::send] method but
2407    /// allows sending custom JSON payloads, e.g. constructed using the
2408    /// [`serde_json::json!()`] macro.
2409    ///
2410    /// If you want to set a transaction ID for the event, use
2411    /// [`.with_transaction_id()`][SendRawMessageLikeEvent::with_transaction_id]
2412    /// on the returned value before `.await`ing it.
2413    ///
2414    /// # Arguments
2415    ///
2416    /// * `event_type` - The type of the event.
2417    ///
2418    /// * `content` - The content of the event as a raw JSON value. The argument
2419    ///   type can be `serde_json::Value`, but also other raw JSON types; for
2420    ///   the full list check the documentation of
2421    ///   [`IntoRawMessageLikeEventContent`].
2422    ///
2423    /// # Examples
2424    ///
2425    /// ```no_run
2426    /// # use std::sync::{Arc, RwLock};
2427    /// # use matrix_sdk::{Client, config::SyncSettings};
2428    /// # use url::Url;
2429    /// # use matrix_sdk::ruma::room_id;
2430    /// # async {
2431    /// # let homeserver = Url::parse("http://localhost:8080")?;
2432    /// # let mut client = Client::new(homeserver).await?;
2433    /// # let room_id = room_id!("!test:localhost");
2434    /// use serde_json::json;
2435    ///
2436    /// if let Some(room) = client.get_room(&room_id) {
2437    ///     room.send_raw("m.room.message", json!({ "body": "Hello world" })).await?;
2438    /// }
2439    /// # anyhow::Ok(()) };
2440    /// ```
2441    #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2442    pub fn send_raw<'a>(
2443        &'a self,
2444        event_type: &'a str,
2445        content: impl IntoRawMessageLikeEventContent,
2446    ) -> SendRawMessageLikeEvent<'a> {
2447        // Note: the recorded instrument fields are saved in
2448        // `SendRawMessageLikeEvent::into_future`.
2449        SendRawMessageLikeEvent::new(self, event_type, content)
2450    }
2451
2452    /// Send an attachment to this room.
2453    ///
2454    /// This will upload the given data that the reader produces using the
2455    /// [`upload()`] method and post an event to the given room.
2456    /// If the room is encrypted and the encryption feature is enabled the
2457    /// upload will be encrypted.
2458    ///
2459    /// This is a convenience method that calls the
2460    /// [`upload()`] and afterwards the [`send()`].
2461    ///
2462    /// # Arguments
2463    /// * `filename` - The file name.
2464    ///
2465    /// * `content_type` - The type of the media, this will be used as the
2466    /// content-type header.
2467    ///
2468    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2469    /// media.
2470    ///
2471    /// * `config` - Metadata and configuration for the attachment.
2472    ///
2473    /// # Examples
2474    ///
2475    /// ```no_run
2476    /// # use std::fs;
2477    /// # use matrix_sdk::{Client, ruma::room_id, attachment::AttachmentConfig};
2478    /// # use url::Url;
2479    /// # use mime;
2480    /// # async {
2481    /// # let homeserver = Url::parse("http://localhost:8080")?;
2482    /// # let mut client = Client::new(homeserver).await?;
2483    /// # let room_id = room_id!("!test:localhost");
2484    /// let mut image = fs::read("/home/example/my-cat.jpg")?;
2485    ///
2486    /// if let Some(room) = client.get_room(&room_id) {
2487    ///     room.send_attachment(
2488    ///         "my_favorite_cat.jpg",
2489    ///         &mime::IMAGE_JPEG,
2490    ///         image,
2491    ///         AttachmentConfig::new(),
2492    ///     ).await?;
2493    /// }
2494    /// # anyhow::Ok(()) };
2495    /// ```
2496    ///
2497    /// [`upload()`]: crate::Media::upload
2498    /// [`send()`]: Self::send
2499    #[instrument(skip_all)]
2500    pub fn send_attachment<'a>(
2501        &'a self,
2502        filename: impl Into<String>,
2503        content_type: &'a Mime,
2504        data: Vec<u8>,
2505        config: AttachmentConfig,
2506    ) -> SendAttachment<'a> {
2507        SendAttachment::new(self, filename.into(), content_type, data, config)
2508    }
2509
2510    /// Prepare and send an attachment to this room.
2511    ///
2512    /// This will upload the given data that the reader produces using the
2513    /// [`upload()`](#method.upload) method and post an event to the given room.
2514    /// If the room is encrypted and the encryption feature is enabled the
2515    /// upload will be encrypted.
2516    ///
2517    /// This is a convenience method that calls the
2518    /// [`Client::upload()`](#Client::method.upload) and afterwards the
2519    /// [`send()`](#method.send).
2520    ///
2521    /// # Arguments
2522    /// * `filename` - The file name.
2523    ///
2524    /// * `content_type` - The type of the media, this will be used as the
2525    ///   content-type header.
2526    ///
2527    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2528    ///   media.
2529    ///
2530    /// * `config` - Metadata and configuration for the attachment.
2531    ///
2532    /// * `send_progress` - An observable to transmit forward progress about the
2533    ///   upload.
2534    ///
2535    /// * `store_in_cache` - A boolean defining whether the uploaded media will
2536    ///   be stored in the cache immediately after a successful upload.
2537    #[instrument(skip_all)]
2538    pub(super) async fn prepare_and_send_attachment<'a>(
2539        &'a self,
2540        filename: String,
2541        content_type: &'a Mime,
2542        data: Vec<u8>,
2543        mut config: AttachmentConfig,
2544        send_progress: SharedObservable<TransmissionProgress>,
2545        store_in_cache: bool,
2546    ) -> Result<send_message_event::v3::Response> {
2547        self.ensure_room_joined()?;
2548
2549        let txn_id = config.txn_id.take();
2550        let mentions = config.mentions.take();
2551
2552        let thumbnail = config.thumbnail.take();
2553
2554        // If necessary, store caching data for the thumbnail ahead of time.
2555        let thumbnail_cache_info = if store_in_cache {
2556            thumbnail
2557                .as_ref()
2558                .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2559        } else {
2560            None
2561        };
2562
2563        #[cfg(feature = "e2e-encryption")]
2564        let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2565            self.client
2566                .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2567                .await?
2568        } else {
2569            self.client
2570                .media()
2571                .upload_plain_media_and_thumbnail(
2572                    content_type,
2573                    // TODO: get rid of this clone; wait for Ruma to use `Bytes` or something
2574                    // similar.
2575                    data.clone(),
2576                    thumbnail,
2577                    send_progress,
2578                )
2579                .await?
2580        };
2581
2582        #[cfg(not(feature = "e2e-encryption"))]
2583        let (media_source, thumbnail) = self
2584            .client
2585            .media()
2586            .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2587            .await?;
2588
2589        if store_in_cache {
2590            let media_store_lock_guard = self.client.media_store().lock().await?;
2591
2592            // A failure to cache shouldn't prevent the whole upload from finishing
2593            // properly, so only log errors during caching.
2594
2595            debug!("caching the media");
2596            let request =
2597                MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2598
2599            if let Err(err) = media_store_lock_guard
2600                .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2601                .await
2602            {
2603                warn!("unable to cache the media after uploading it: {err}");
2604            }
2605
2606            if let Some(((data, height, width), source)) =
2607                thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2608            {
2609                debug!("caching the thumbnail");
2610
2611                let request = MediaRequestParameters {
2612                    source: source.clone(),
2613                    format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2614                };
2615
2616                if let Err(err) = media_store_lock_guard
2617                    .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2618                    .await
2619                {
2620                    warn!("unable to cache the media after uploading it: {err}");
2621                }
2622            }
2623        }
2624
2625        let content = self
2626            .make_media_event(
2627                Room::make_attachment_type(
2628                    content_type,
2629                    filename,
2630                    media_source,
2631                    config.caption,
2632                    config.info,
2633                    thumbnail,
2634                ),
2635                mentions,
2636                config.reply,
2637            )
2638            .await?;
2639
2640        let mut fut = self.send(content);
2641        if let Some(txn_id) = txn_id {
2642            fut = fut.with_transaction_id(txn_id);
2643        }
2644        fut.await
2645    }
2646
2647    /// Creates the inner [`MessageType`] for an already-uploaded media file
2648    /// provided by its source.
2649    #[allow(clippy::too_many_arguments)]
2650    pub(crate) fn make_attachment_type(
2651        content_type: &Mime,
2652        filename: String,
2653        source: MediaSource,
2654        caption: Option<TextMessageEventContent>,
2655        info: Option<AttachmentInfo>,
2656        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2657    ) -> MessageType {
2658        make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2659    }
2660
2661    /// Creates the [`RoomMessageEventContent`] based on the message type,
2662    /// mentions and reply information.
2663    pub(crate) async fn make_media_event(
2664        &self,
2665        msg_type: MessageType,
2666        mentions: Option<Mentions>,
2667        reply: Option<Reply>,
2668    ) -> Result<RoomMessageEventContent> {
2669        let mut content = RoomMessageEventContent::new(msg_type);
2670        if let Some(mentions) = mentions {
2671            content = content.add_mentions(mentions);
2672        }
2673        if let Some(reply) = reply {
2674            // Since we just created the event, there is no relation attached to it. Thus,
2675            // it is safe to add the reply relation without overriding anything.
2676            content = self.make_reply_event(content.into(), reply).await?;
2677        }
2678        Ok(content)
2679    }
2680
2681    /// Creates the inner [`GalleryItemType`] for an already-uploaded media file
2682    /// provided by its source.
2683    #[cfg(feature = "unstable-msc4274")]
2684    #[allow(clippy::too_many_arguments)]
2685    pub(crate) fn make_gallery_item_type(
2686        content_type: &Mime,
2687        filename: String,
2688        source: MediaSource,
2689        caption: Option<TextMessageEventContent>,
2690        info: Option<AttachmentInfo>,
2691        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2692    ) -> GalleryItemType {
2693        make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2694    }
2695
2696    /// Update the power levels of a select set of users of this room.
2697    ///
2698    /// Issue a `power_levels` state event request to the server, changing the
2699    /// given UserId -> Int levels. May fail if the `power_levels` aren't
2700    /// locally known yet or the server rejects the state event update, e.g.
2701    /// because of insufficient permissions. Neither permissions to update
2702    /// nor whether the data might be stale is checked prior to issuing the
2703    /// request.
2704    pub async fn update_power_levels(
2705        &self,
2706        updates: Vec<(&UserId, Int)>,
2707    ) -> Result<send_state_event::v3::Response> {
2708        let mut power_levels = self.power_levels().await?;
2709
2710        for (user_id, new_level) in updates {
2711            if new_level == power_levels.users_default {
2712                power_levels.users.remove(user_id);
2713            } else {
2714                power_levels.users.insert(user_id.to_owned(), new_level);
2715            }
2716        }
2717
2718        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2719    }
2720
2721    /// Applies a set of power level changes to this room.
2722    ///
2723    /// Any values that are `None` in the given `RoomPowerLevelChanges` will
2724    /// remain unchanged.
2725    pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2726        let mut power_levels = self.power_levels().await?;
2727        power_levels.apply(changes)?;
2728        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2729        Ok(())
2730    }
2731
2732    /// Resets the room's power levels to the default values
2733    ///
2734    /// [spec]: https://spec.matrix.org/v1.9/client-server-api/#mroompower_levels
2735    pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2736        let creators = self.creators().unwrap_or_default();
2737        let rules = self.clone_info().room_version_rules_or_default();
2738
2739        let default_power_levels =
2740            RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2741        let changes = RoomPowerLevelChanges::from(default_power_levels);
2742        self.apply_power_level_changes(changes).await?;
2743        Ok(self.power_levels().await?)
2744    }
2745
2746    /// Gets the suggested role for the user with the provided `user_id`.
2747    ///
2748    /// This method checks the `RoomPowerLevels` events instead of loading the
2749    /// member list and looking for the member.
2750    pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2751        let power_level = self.get_user_power_level(user_id).await?;
2752        Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2753    }
2754
2755    /// Gets the power level the user with the provided `user_id`.
2756    ///
2757    /// This method checks the `RoomPowerLevels` events instead of loading the
2758    /// member list and looking for the member.
2759    pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2760        let event = self.power_levels().await?;
2761        Ok(event.for_user(user_id))
2762    }
2763
2764    /// Gets a map with the `UserId` of users with power levels other than `0`
2765    /// and this power level.
2766    pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2767        let power_levels = self.power_levels().await.ok();
2768        let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2769        if let Some(power_levels) = power_levels {
2770            for (id, level) in power_levels.users.into_iter() {
2771                user_power_levels.insert(id, level.into());
2772            }
2773        }
2774        user_power_levels
2775    }
2776
2777    /// Sets the name of this room.
2778    pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2779        self.send_state_event(RoomNameEventContent::new(name)).await
2780    }
2781
2782    /// Sets a new topic for this room.
2783    pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2784        self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2785    }
2786
2787    /// Sets the new avatar url for this room.
2788    ///
2789    /// # Arguments
2790    /// * `avatar_url` - The owned Matrix uri that represents the avatar
2791    /// * `info` - The optional image info that can be provided for the avatar
2792    pub async fn set_avatar_url(
2793        &self,
2794        url: &MxcUri,
2795        info: Option<avatar::ImageInfo>,
2796    ) -> Result<send_state_event::v3::Response> {
2797        self.ensure_room_joined()?;
2798
2799        let mut room_avatar_event = RoomAvatarEventContent::new();
2800        room_avatar_event.url = Some(url.to_owned());
2801        room_avatar_event.info = info.map(Box::new);
2802
2803        self.send_state_event(room_avatar_event).await
2804    }
2805
2806    /// Removes the avatar from the room
2807    pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2808        self.send_state_event(RoomAvatarEventContent::new()).await
2809    }
2810
2811    /// Uploads a new avatar for this room.
2812    ///
2813    /// # Arguments
2814    /// * `mime` - The mime type describing the data
2815    /// * `data` - The data representation of the avatar
2816    /// * `info` - The optional image info provided for the avatar, the blurhash
2817    ///   and the mimetype will always be updated
2818    pub async fn upload_avatar(
2819        &self,
2820        mime: &Mime,
2821        data: Vec<u8>,
2822        info: Option<avatar::ImageInfo>,
2823    ) -> Result<send_state_event::v3::Response> {
2824        self.ensure_room_joined()?;
2825
2826        let upload_response = self.client.media().upload(mime, data, None).await?;
2827        let mut info = info.unwrap_or_default();
2828        info.blurhash = upload_response.blurhash;
2829        info.mimetype = Some(mime.to_string());
2830
2831        self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2832    }
2833
2834    /// Send a state event with an empty state key to the homeserver.
2835    ///
2836    /// For state events with a non-empty state key, see
2837    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
2838    ///
2839    /// Returns the parsed response from the server.
2840    ///
2841    /// # Arguments
2842    ///
2843    /// * `content` - The content of the state event.
2844    ///
2845    /// # Examples
2846    ///
2847    /// ```no_run
2848    /// # use serde::{Deserialize, Serialize};
2849    /// # async {
2850    /// # let joined_room: matrix_sdk::Room = todo!();
2851    /// use matrix_sdk::ruma::{
2852    ///     EventEncryptionAlgorithm,
2853    ///     events::{
2854    ///         EmptyStateKey, macros::EventContent,
2855    ///         room::encryption::RoomEncryptionEventContent,
2856    ///     },
2857    /// };
2858    ///
2859    /// let encryption_event_content = RoomEncryptionEventContent::new(
2860    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
2861    /// );
2862    /// joined_room.send_state_event(encryption_event_content).await?;
2863    ///
2864    /// // Custom event:
2865    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2866    /// #[ruma_event(
2867    ///     type = "org.matrix.msc_9000.xxx",
2868    ///     kind = State,
2869    ///     state_key_type = EmptyStateKey,
2870    /// )]
2871    /// struct XxxStateEventContent {/* fields... */}
2872    ///
2873    /// let content: XxxStateEventContent = todo!();
2874    /// joined_room.send_state_event(content).await?;
2875    /// # anyhow::Ok(()) };
2876    /// ```
2877    #[cfg(not(feature = "experimental-encrypted-state-events"))]
2878    #[instrument(skip_all)]
2879    pub async fn send_state_event(
2880        &self,
2881        content: impl StateEventContent<StateKey = EmptyStateKey>,
2882    ) -> Result<send_state_event::v3::Response> {
2883        self.send_state_event_for_key(&EmptyStateKey, content).await
2884    }
2885
2886    /// Send a state event with an empty state key to the homeserver.
2887    ///
2888    /// For state events with a non-empty state key, see
2889    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
2890    ///
2891    /// If the experimental state event encryption feature is enabled, this
2892    /// method will transparently encrypt the event if this room is
2893    /// encrypted (except if the event type is considered critical for the room
2894    /// to function, as outlined in [MSC3414][msc3414]).
2895    ///
2896    /// Returns the parsed response from the server.
2897    ///
2898    /// # Arguments
2899    ///
2900    /// * `content` - The content of the state event.
2901    ///
2902    /// # Examples
2903    ///
2904    /// ```no_run
2905    /// # use serde::{Deserialize, Serialize};
2906    /// # async {
2907    /// # let joined_room: matrix_sdk::Room = todo!();
2908    /// use matrix_sdk::ruma::{
2909    ///     EventEncryptionAlgorithm,
2910    ///     events::{
2911    ///         EmptyStateKey, macros::EventContent,
2912    ///         room::encryption::RoomEncryptionEventContent,
2913    ///     },
2914    /// };
2915    ///
2916    /// let encryption_event_content = RoomEncryptionEventContent::new(
2917    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
2918    /// );
2919    /// joined_room.send_state_event(encryption_event_content).await?;
2920    ///
2921    /// // Custom event:
2922    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2923    /// #[ruma_event(
2924    ///     type = "org.matrix.msc_9000.xxx",
2925    ///     kind = State,
2926    ///     state_key_type = EmptyStateKey,
2927    /// )]
2928    /// struct XxxStateEventContent {/* fields... */}
2929    ///
2930    /// let content: XxxStateEventContent = todo!();
2931    /// joined_room.send_state_event(content).await?;
2932    /// # anyhow::Ok(()) };
2933    /// ```
2934    ///
2935    /// [msc3414]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/3414-encrypted-state.md
2936    #[cfg(feature = "experimental-encrypted-state-events")]
2937    #[instrument(skip_all)]
2938    pub fn send_state_event<'a>(
2939        &'a self,
2940        content: impl StateEventContent<StateKey = EmptyStateKey>,
2941    ) -> SendStateEvent<'a> {
2942        self.send_state_event_for_key(&EmptyStateKey, content)
2943    }
2944
2945    /// Send a state event to the homeserver.
2946    ///
2947    /// Returns the parsed response from the server.
2948    ///
2949    /// # Arguments
2950    ///
2951    /// * `content` - The content of the state event.
2952    ///
2953    /// * `state_key` - A unique key which defines the overwriting semantics for
2954    ///   this piece of room state.
2955    ///
2956    /// # Examples
2957    ///
2958    /// ```no_run
2959    /// # use serde::{Deserialize, Serialize};
2960    /// # async {
2961    /// # let joined_room: matrix_sdk::Room = todo!();
2962    /// use matrix_sdk::ruma::{
2963    ///     events::{
2964    ///         macros::EventContent,
2965    ///         room::member::{RoomMemberEventContent, MembershipState},
2966    ///     },
2967    ///     mxc_uri,
2968    /// };
2969    ///
2970    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
2971    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
2972    /// content.avatar_url = Some(avatar_url);
2973    ///
2974    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
2975    ///
2976    /// // Custom event:
2977    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2978    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
2979    /// struct XxxStateEventContent { /* fields... */ }
2980    ///
2981    /// let content: XxxStateEventContent = todo!();
2982    /// joined_room.send_state_event_for_key("foo", content).await?;
2983    /// # anyhow::Ok(()) };
2984    /// ```
2985    #[cfg(not(feature = "experimental-encrypted-state-events"))]
2986    pub async fn send_state_event_for_key<C, K>(
2987        &self,
2988        state_key: &K,
2989        content: C,
2990    ) -> Result<send_state_event::v3::Response>
2991    where
2992        C: StateEventContent,
2993        C::StateKey: Borrow<K>,
2994        K: AsRef<str> + ?Sized,
2995    {
2996        self.ensure_room_joined()?;
2997        let request =
2998            send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
2999        let response = self.client.send(request).await?;
3000        Ok(response)
3001    }
3002
3003    /// Send a state event to the homeserver. If state encryption is enabled in
3004    /// this room, the event will be encrypted.
3005    ///
3006    /// If the experimental state event encryption feature is enabled, this
3007    /// method will transparently encrypt the event if this room is
3008    /// encrypted (except if the event type is considered critical for the room
3009    /// to function, as outlined in [MSC3414][msc3414]).
3010    ///
3011    /// Returns the parsed response from the server.
3012    ///
3013    /// # Arguments
3014    ///
3015    /// * `content` - The content of the state event.
3016    ///
3017    /// * `state_key` - A unique key which defines the overwriting semantics for
3018    ///   this piece of room state.
3019    ///
3020    /// # Examples
3021    ///
3022    /// ```no_run
3023    /// # use serde::{Deserialize, Serialize};
3024    /// # async {
3025    /// # let joined_room: matrix_sdk::Room = todo!();
3026    /// use matrix_sdk::ruma::{
3027    ///     events::{
3028    ///         macros::EventContent,
3029    ///         room::member::{RoomMemberEventContent, MembershipState},
3030    ///     },
3031    ///     mxc_uri,
3032    /// };
3033    ///
3034    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3035    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3036    /// content.avatar_url = Some(avatar_url);
3037    ///
3038    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3039    ///
3040    /// // Custom event:
3041    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3042    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3043    /// struct XxxStateEventContent { /* fields... */ }
3044    ///
3045    /// let content: XxxStateEventContent = todo!();
3046    /// joined_room.send_state_event_for_key("foo", content).await?;
3047    /// # anyhow::Ok(()) };
3048    /// ```
3049    ///
3050    /// [msc3414]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/3414-encrypted-state.md
3051    #[cfg(feature = "experimental-encrypted-state-events")]
3052    pub fn send_state_event_for_key<'a, C, K>(
3053        &'a self,
3054        state_key: &K,
3055        content: C,
3056    ) -> SendStateEvent<'a>
3057    where
3058        C: StateEventContent,
3059        C::StateKey: Borrow<K>,
3060        K: AsRef<str> + ?Sized,
3061    {
3062        SendStateEvent::new(self, state_key, content)
3063    }
3064
3065    /// Send a raw room state event to the homeserver.
3066    ///
3067    /// Returns the parsed response from the server.
3068    ///
3069    /// # Arguments
3070    ///
3071    /// * `event_type` - The type of the event that we're sending out.
3072    ///
3073    /// * `state_key` - A unique key which defines the overwriting semantics for
3074    /// this piece of room state. This value is often a zero-length string.
3075    ///
3076    /// * `content` - The content of the event as a raw JSON value. The argument
3077    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3078    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3079    ///
3080    /// # Examples
3081    ///
3082    /// ```no_run
3083    /// use serde_json::json;
3084    ///
3085    /// # async {
3086    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3087    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3088    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3089    ///
3090    /// if let Some(room) = client.get_room(&room_id) {
3091    ///     room.send_state_event_raw("m.room.member", "", json!({
3092    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3093    ///         "displayname": "Alice Margatroid",
3094    ///         "membership": "join",
3095    ///     })).await?;
3096    /// }
3097    /// # anyhow::Ok(()) };
3098    /// ```
3099    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3100    #[instrument(skip_all)]
3101    pub async fn send_state_event_raw(
3102        &self,
3103        event_type: &str,
3104        state_key: &str,
3105        content: impl IntoRawStateEventContent,
3106    ) -> Result<send_state_event::v3::Response> {
3107        self.ensure_room_joined()?;
3108
3109        let request = send_state_event::v3::Request::new_raw(
3110            self.room_id().to_owned(),
3111            event_type.into(),
3112            state_key.to_owned(),
3113            content.into_raw_state_event_content(),
3114        );
3115
3116        Ok(self.client.send(request).await?)
3117    }
3118
3119    /// Send a raw room state event to the homeserver.
3120    ///
3121    /// If the experimental state event encryption feature is enabled, this
3122    /// method will transparently encrypt the event if this room is
3123    /// encrypted (except if the event type is considered critical for the room
3124    /// to function, as outlined in [MSC3414][msc3414]).
3125    ///
3126    /// Returns the parsed response from the server.
3127    ///
3128    /// # Arguments
3129    ///
3130    /// * `event_type` - The type of the event that we're sending out.
3131    ///
3132    /// * `state_key` - A unique key which defines the overwriting semantics for
3133    /// this piece of room state. This value is often a zero-length string.
3134    ///
3135    /// * `content` - The content of the event as a raw JSON value. The argument
3136    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3137    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3138    ///
3139    /// # Examples
3140    ///
3141    /// ```no_run
3142    /// use serde_json::json;
3143    ///
3144    /// # async {
3145    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3146    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3147    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3148    ///
3149    /// if let Some(room) = client.get_room(&room_id) {
3150    ///     room.send_state_event_raw("m.room.member", "", json!({
3151    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3152    ///         "displayname": "Alice Margatroid",
3153    ///         "membership": "join",
3154    ///     })).await?;
3155    /// }
3156    /// # anyhow::Ok(()) };
3157    /// ```
3158    ///
3159    /// [msc3414]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/3414-encrypted-state.md
3160    #[cfg(feature = "experimental-encrypted-state-events")]
3161    #[instrument(skip_all)]
3162    pub fn send_state_event_raw<'a>(
3163        &'a self,
3164        event_type: &'a str,
3165        state_key: &'a str,
3166        content: impl IntoRawStateEventContent,
3167    ) -> SendRawStateEvent<'a> {
3168        SendRawStateEvent::new(self, event_type, state_key, content)
3169    }
3170
3171    /// Strips all information out of an event of the room.
3172    ///
3173    /// Returns the [`redact_event::v3::Response`] from the server.
3174    ///
3175    /// This cannot be undone. Users may redact their own events, and any user
3176    /// with a power level greater than or equal to the redact power level of
3177    /// the room may redact events there.
3178    ///
3179    /// # Arguments
3180    ///
3181    /// * `event_id` - The ID of the event to redact
3182    ///
3183    /// * `reason` - The reason for the event being redacted.
3184    ///
3185    /// * `txn_id` - A unique ID that can be attached to this event as
3186    /// its transaction ID. If not given one is created for the message.
3187    ///
3188    /// # Examples
3189    ///
3190    /// ```no_run
3191    /// use matrix_sdk::ruma::event_id;
3192    ///
3193    /// # async {
3194    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3195    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3196    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3197    /// #
3198    /// if let Some(room) = client.get_room(&room_id) {
3199    ///     let event_id = event_id!("$xxxxxx:example.org");
3200    ///     let reason = Some("Indecent material");
3201    ///     room.redact(&event_id, reason, None).await?;
3202    /// }
3203    /// # anyhow::Ok(()) };
3204    /// ```
3205    #[instrument(skip_all)]
3206    pub async fn redact(
3207        &self,
3208        event_id: &EventId,
3209        reason: Option<&str>,
3210        txn_id: Option<OwnedTransactionId>,
3211    ) -> HttpResult<redact_event::v3::Response> {
3212        let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3213        let request = assign!(
3214            redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3215            { reason: reason.map(ToOwned::to_owned) }
3216        );
3217
3218        self.client.send(request).await
3219    }
3220
3221    /// Get a list of servers that should know this room.
3222    ///
3223    /// Uses the synced members of the room and the suggested [routing
3224    /// algorithm] from the Matrix spec.
3225    ///
3226    /// Returns at most three servers.
3227    ///
3228    /// [routing algorithm]: https://spec.matrix.org/v1.3/appendices/#routing
3229    pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3230        let acl_ev = self
3231            .get_state_event_static::<RoomServerAclEventContent>()
3232            .await?
3233            .and_then(|ev| ev.deserialize().ok());
3234        let acl = acl_ev.as_ref().and_then(|ev| match ev {
3235            SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3236            SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3237        });
3238
3239        // Filter out server names that:
3240        // - Are blocked due to server ACLs
3241        // - Are IP addresses
3242        let members: Vec<_> = self
3243            .members_no_sync(RoomMemberships::JOIN)
3244            .await?
3245            .into_iter()
3246            .filter(|member| {
3247                let server = member.user_id().server_name();
3248                acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3249            })
3250            .collect();
3251
3252        // Get the server of the highest power level user in the room, provided
3253        // they are at least power level 50.
3254        let max = members
3255            .iter()
3256            .max_by_key(|member| member.power_level())
3257            .filter(|max| max.power_level() >= int!(50))
3258            .map(|member| member.user_id().server_name());
3259
3260        // Sort the servers by population.
3261        let servers = members
3262            .iter()
3263            .map(|member| member.user_id().server_name())
3264            .filter(|server| max.filter(|max| max == server).is_none())
3265            .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3266                *servers.entry(server).or_default() += 1;
3267                servers
3268            });
3269        let mut servers: Vec<_> = servers.into_iter().collect();
3270        servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3271
3272        Ok(max
3273            .into_iter()
3274            .chain(servers.into_iter().map(|(name, _)| name))
3275            .take(3)
3276            .map(ToOwned::to_owned)
3277            .collect())
3278    }
3279
3280    /// Get a `matrix.to` permalink to this room.
3281    ///
3282    /// If this room has an alias, we use it. Otherwise, we try to use the
3283    /// synced members in the room for [routing] the room ID.
3284    ///
3285    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3286    pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3287        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3288            return Ok(alias.matrix_to_uri());
3289        }
3290
3291        let via = self.route().await?;
3292        Ok(self.room_id().matrix_to_uri_via(via))
3293    }
3294
3295    /// Get a `matrix:` permalink to this room.
3296    ///
3297    /// If this room has an alias, we use it. Otherwise, we try to use the
3298    /// synced members in the room for [routing] the room ID.
3299    ///
3300    /// # Arguments
3301    ///
3302    /// * `join` - Whether the user should join the room.
3303    ///
3304    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3305    pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3306        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3307            return Ok(alias.matrix_uri(join));
3308        }
3309
3310        let via = self.route().await?;
3311        Ok(self.room_id().matrix_uri_via(via, join))
3312    }
3313
3314    /// Get a `matrix.to` permalink to an event in this room.
3315    ///
3316    /// We try to use the synced members in the room for [routing] the room ID.
3317    ///
3318    /// *Note*: This method does not check if the given event ID is actually
3319    /// part of this room. It needs to be checked before calling this method
3320    /// otherwise the permalink won't work.
3321    ///
3322    /// # Arguments
3323    ///
3324    /// * `event_id` - The ID of the event.
3325    ///
3326    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3327    pub async fn matrix_to_event_permalink(
3328        &self,
3329        event_id: impl Into<OwnedEventId>,
3330    ) -> Result<MatrixToUri> {
3331        // Don't use the alias because an event is tied to a room ID, but an
3332        // alias might point to another room, e.g. after a room upgrade.
3333        let via = self.route().await?;
3334        Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3335    }
3336
3337    /// Get a `matrix:` permalink to an event in this room.
3338    ///
3339    /// We try to use the synced members in the room for [routing] the room ID.
3340    ///
3341    /// *Note*: This method does not check if the given event ID is actually
3342    /// part of this room. It needs to be checked before calling this method
3343    /// otherwise the permalink won't work.
3344    ///
3345    /// # Arguments
3346    ///
3347    /// * `event_id` - The ID of the event.
3348    ///
3349    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3350    pub async fn matrix_event_permalink(
3351        &self,
3352        event_id: impl Into<OwnedEventId>,
3353    ) -> Result<MatrixUri> {
3354        // Don't use the alias because an event is tied to a room ID, but an
3355        // alias might point to another room, e.g. after a room upgrade.
3356        let via = self.route().await?;
3357        Ok(self.room_id().matrix_event_uri_via(event_id, via))
3358    }
3359
3360    /// Get the latest receipt of a user in this room.
3361    ///
3362    /// # Arguments
3363    ///
3364    /// * `receipt_type` - The type of receipt to get.
3365    ///
3366    /// * `thread` - The thread containing the event of the receipt, if any.
3367    ///
3368    /// * `user_id` - The ID of the user.
3369    ///
3370    /// Returns the ID of the event on which the receipt applies and the
3371    /// receipt.
3372    pub async fn load_user_receipt(
3373        &self,
3374        receipt_type: ReceiptType,
3375        thread: ReceiptThread,
3376        user_id: &UserId,
3377    ) -> Result<Option<(OwnedEventId, Receipt)>> {
3378        self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3379    }
3380
3381    /// Load the receipts for an event in this room from storage.
3382    ///
3383    /// # Arguments
3384    ///
3385    /// * `receipt_type` - The type of receipt to get.
3386    ///
3387    /// * `thread` - The thread containing the event of the receipt, if any.
3388    ///
3389    /// * `event_id` - The ID of the event.
3390    ///
3391    /// Returns a list of IDs of users who have sent a receipt for the event and
3392    /// the corresponding receipts.
3393    pub async fn load_event_receipts(
3394        &self,
3395        receipt_type: ReceiptType,
3396        thread: ReceiptThread,
3397        event_id: &EventId,
3398    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3399        self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3400    }
3401
3402    /// Get the push-condition context for this room.
3403    ///
3404    /// Returns `None` if some data couldn't be found. This should only happen
3405    /// in brand new rooms, while we process its state.
3406    pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3407        self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3408    }
3409
3410    /// Get the push-condition context for this room, with a choice to include
3411    /// thread subscriptions or not, based on the extra
3412    /// `with_threads_subscriptions` parameter.
3413    ///
3414    /// Returns `None` if some data couldn't be found. This should only happen
3415    /// in brand new rooms, while we process its state.
3416    pub(crate) async fn push_condition_room_ctx_internal(
3417        &self,
3418        with_threads_subscriptions: bool,
3419    ) -> Result<Option<PushConditionRoomCtx>> {
3420        let room_id = self.room_id();
3421        let user_id = self.own_user_id();
3422        let room_info = self.clone_info();
3423        let member_count = room_info.active_members_count();
3424
3425        let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3426            member.name().to_owned()
3427        } else {
3428            return Ok(None);
3429        };
3430
3431        let power_levels = match self.power_levels().await {
3432            Ok(power_levels) => Some(power_levels.into()),
3433            Err(error) => {
3434                if matches!(room_info.state(), RoomState::Joined) {
3435                    // It's normal to not have the power levels in a non-joined room, so don't log
3436                    // the error if the room is not joined
3437                    error!("Could not compute power levels for push conditions: {error}");
3438                }
3439                None
3440            }
3441        };
3442
3443        let mut ctx = assign!(PushConditionRoomCtx::new(
3444            room_id.to_owned(),
3445            UInt::new(member_count).unwrap_or(UInt::MAX),
3446            user_id.to_owned(),
3447            user_display_name,
3448        ),
3449        {
3450            power_levels,
3451        });
3452
3453        if with_threads_subscriptions {
3454            let this = self.clone();
3455            ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3456                let room = this.clone();
3457                Box::pin(async move {
3458                    if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3459                        maybe_sub.is_some()
3460                    } else {
3461                        false
3462                    }
3463                })
3464            });
3465        }
3466
3467        Ok(Some(ctx))
3468    }
3469
3470    /// Retrieves a [`PushContext`] that can be used to compute the push
3471    /// actions for events.
3472    pub async fn push_context(&self) -> Result<Option<PushContext>> {
3473        self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3474    }
3475
3476    /// Retrieves a [`PushContext`] that can be used to compute the push actions
3477    /// for events, with a choice to include thread subscriptions or not,
3478    /// based on the extra `with_threads_subscriptions` parameter.
3479    #[instrument(skip(self))]
3480    pub(crate) async fn push_context_internal(
3481        &self,
3482        with_threads_subscriptions: bool,
3483    ) -> Result<Option<PushContext>> {
3484        let Some(push_condition_room_ctx) =
3485            self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3486        else {
3487            debug!("Could not aggregate push context");
3488            return Ok(None);
3489        };
3490        let push_rules = self.client().account().push_rules().await?;
3491        Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3492    }
3493
3494    /// Get the push actions for the given event with the current room state.
3495    ///
3496    /// Note that it is possible that no push action is returned because the
3497    /// current room state does not have all the required state events.
3498    pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3499        if let Some(ctx) = self.push_context().await? {
3500            Ok(Some(ctx.for_event(event).await))
3501        } else {
3502            Ok(None)
3503        }
3504    }
3505
3506    /// The membership details of the (latest) invite for the logged-in user in
3507    /// this room.
3508    pub async fn invite_details(&self) -> Result<Invite> {
3509        let state = self.state();
3510
3511        if state != RoomState::Invited {
3512            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3513        }
3514
3515        let invitee = self
3516            .get_member_no_sync(self.own_user_id())
3517            .await?
3518            .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3519        let event = invitee.event();
3520        let inviter_id = event.sender();
3521        let inviter = self.get_member_no_sync(inviter_id).await?;
3522        Ok(Invite { invitee, inviter })
3523    }
3524
3525    /// Get the membership details for the current user.
3526    ///
3527    /// Returns:
3528    ///     - If the user was present in the room, a
3529    ///       [`RoomMemberWithSenderInfo`] containing both the user info and the
3530    ///       member info of the sender of the `m.room.member` event.
3531    ///     - If the current user is not present, an error.
3532    pub async fn member_with_sender_info(
3533        &self,
3534        user_id: &UserId,
3535    ) -> Result<RoomMemberWithSenderInfo> {
3536        let Some(member) = self.get_member_no_sync(user_id).await? else {
3537            return Err(Error::InsufficientData);
3538        };
3539
3540        let sender_member =
3541            if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3542                // If the sender room member info is already available, return it
3543                Some(member)
3544            } else if self.are_members_synced() {
3545                // The room members are synced and we couldn't find the sender info
3546                None
3547            } else if self.sync_members().await.is_ok() {
3548                // Try getting the sender room member info again after syncing
3549                self.get_member_no_sync(member.event().sender()).await?
3550            } else {
3551                None
3552            };
3553
3554        Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3555    }
3556
3557    /// Forget this room.
3558    ///
3559    /// This communicates to the homeserver that it should forget the room.
3560    ///
3561    /// Only left or banned-from rooms can be forgotten.
3562    pub async fn forget(&self) -> Result<()> {
3563        let state = self.state();
3564        match state {
3565            RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3566                return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3567                    "Left / Banned",
3568                    state,
3569                ))));
3570            }
3571            RoomState::Left | RoomState::Banned => {}
3572        }
3573
3574        let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3575        let _response = self.client.send(request).await?;
3576
3577        // If it was a DM, remove the room from the `m.direct` global account data.
3578        if self.inner.direct_targets_length() != 0
3579            && let Err(e) = self.set_is_direct(false).await
3580        {
3581            // It is not important whether we managed to remove the room, it will not have
3582            // any consequences, so just log the error.
3583            warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3584        }
3585
3586        self.client.base_client().forget_room(self.inner.room_id()).await?;
3587
3588        Ok(())
3589    }
3590
3591    fn ensure_room_joined(&self) -> Result<()> {
3592        let state = self.state();
3593        if state == RoomState::Joined {
3594            Ok(())
3595        } else {
3596            Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3597        }
3598    }
3599
3600    /// Get the notification mode.
3601    pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3602        if !matches!(self.state(), RoomState::Joined) {
3603            return None;
3604        }
3605
3606        let notification_settings = self.client().notification_settings().await;
3607
3608        // Get the user-defined mode if available
3609        let notification_mode =
3610            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3611
3612        if notification_mode.is_some() {
3613            notification_mode
3614        } else if let Ok(is_encrypted) =
3615            self.latest_encryption_state().await.map(|state| state.is_encrypted())
3616        {
3617            // Otherwise, if encrypted status is available, get the default mode for this
3618            // type of room.
3619            // From the point of view of notification settings, a `one-to-one` room is one
3620            // that involves exactly two people.
3621            let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3622            let default_mode = notification_settings
3623                .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3624                .await;
3625            Some(default_mode)
3626        } else {
3627            None
3628        }
3629    }
3630
3631    /// Get the user-defined notification mode.
3632    ///
3633    /// The result is cached for fast and non-async call. To read the cached
3634    /// result, use
3635    /// [`matrix_sdk_base::Room::cached_user_defined_notification_mode`].
3636    //
3637    // Note for maintainers:
3638    //
3639    // The fact the result is cached is an important property. If you change that in
3640    // the future, please review all calls to this method.
3641    pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3642        if !matches!(self.state(), RoomState::Joined) {
3643            return None;
3644        }
3645
3646        let notification_settings = self.client().notification_settings().await;
3647
3648        // Get the user-defined mode if available.
3649        let mode =
3650            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3651
3652        if let Some(mode) = mode {
3653            self.update_cached_user_defined_notification_mode(mode);
3654        }
3655
3656        mode
3657    }
3658
3659    /// Report an event as inappropriate to the homeserver's administrator.
3660    ///
3661    /// # Arguments
3662    ///
3663    /// * `event_id` - The ID of the event to report.
3664    /// * `score` - The score to rate this content.
3665    /// * `reason` - The reason the content is being reported.
3666    ///
3667    /// # Errors
3668    ///
3669    /// Returns an error if the room is not joined or if an error occurs with
3670    /// the request.
3671    pub async fn report_content(
3672        &self,
3673        event_id: OwnedEventId,
3674        score: Option<ReportedContentScore>,
3675        reason: Option<String>,
3676    ) -> Result<report_content::v3::Response> {
3677        let state = self.state();
3678        if state != RoomState::Joined {
3679            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3680        }
3681
3682        let request = report_content::v3::Request::new(
3683            self.inner.room_id().to_owned(),
3684            event_id,
3685            score.map(Into::into),
3686            reason,
3687        );
3688        Ok(self.client.send(request).await?)
3689    }
3690
3691    /// Reports a room as inappropriate to the server.
3692    /// The caller is not required to be joined to the room to report it.
3693    ///
3694    /// # Arguments
3695    ///
3696    /// * `reason` - The reason the room is being reported.
3697    ///
3698    /// # Errors
3699    ///
3700    /// Returns an error if the room is not found or on rate limit
3701    pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3702        let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3703
3704        Ok(self.client.send(request).await?)
3705    }
3706
3707    /// Set a flag on the room to indicate that the user has explicitly marked
3708    /// it as (un)read.
3709    ///
3710    /// This is a no-op if [`BaseRoom::is_marked_unread()`] returns the same
3711    /// value as `unread`.
3712    pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3713        if self.is_marked_unread() == unread {
3714            // The request is not necessary.
3715            return Ok(());
3716        }
3717
3718        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3719
3720        let content = MarkedUnreadEventContent::new(unread);
3721
3722        let request = set_room_account_data::v3::Request::new(
3723            user_id.to_owned(),
3724            self.inner.room_id().to_owned(),
3725            &content,
3726        )?;
3727
3728        self.client.send(request).await?;
3729        Ok(())
3730    }
3731
3732    /// Returns the [`RoomEventCache`] associated to this room, assuming the
3733    /// global [`EventCache`] has been enabled for subscription.
3734    pub async fn event_cache(
3735        &self,
3736    ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3737        self.client.event_cache().for_room(self.room_id()).await
3738    }
3739
3740    /// Get the beacon information event in the room for the `user_id`.
3741    ///
3742    /// # Errors
3743    ///
3744    /// Returns an error if the event is redacted, stripped, not found or could
3745    /// not be deserialized.
3746    pub(crate) async fn get_user_beacon_info(
3747        &self,
3748        user_id: &UserId,
3749    ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3750        let raw_event = self
3751            .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3752            .await?
3753            .ok_or(BeaconError::NotFound)?;
3754
3755        match raw_event.deserialize()? {
3756            SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3757            SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3758            SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3759        }
3760    }
3761
3762    /// Start sharing live location in the room.
3763    ///
3764    /// # Arguments
3765    ///
3766    /// * `duration_millis` - The duration for which the live location is
3767    ///   shared, in milliseconds.
3768    /// * `description` - An optional description for the live location share.
3769    ///
3770    /// # Errors
3771    ///
3772    /// Returns an error if the room is not joined or if the state event could
3773    /// not be sent.
3774    pub async fn start_live_location_share(
3775        &self,
3776        duration_millis: u64,
3777        description: Option<String>,
3778    ) -> Result<send_state_event::v3::Response> {
3779        self.ensure_room_joined()?;
3780
3781        self.send_state_event_for_key(
3782            self.own_user_id(),
3783            BeaconInfoEventContent::new(
3784                description,
3785                Duration::from_millis(duration_millis),
3786                true,
3787                None,
3788            ),
3789        )
3790        .await
3791    }
3792
3793    /// Stop sharing live location in the room.
3794    ///
3795    /// # Errors
3796    ///
3797    /// Returns an error if the room is not joined, if the beacon information
3798    /// is redacted or stripped, or if the state event is not found.
3799    pub async fn stop_live_location_share(
3800        &self,
3801    ) -> Result<send_state_event::v3::Response, BeaconError> {
3802        self.ensure_room_joined()?;
3803
3804        let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3805        beacon_info_event.content.stop();
3806        Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3807    }
3808
3809    /// Send a location beacon event in the current room.
3810    ///
3811    /// # Arguments
3812    ///
3813    /// * `geo_uri` - The geo URI of the location beacon.
3814    ///
3815    /// # Errors
3816    ///
3817    /// Returns an error if the room is not joined, if the beacon information
3818    /// is redacted or stripped, if the location share is no longer live,
3819    /// or if the state event is not found.
3820    pub async fn send_location_beacon(
3821        &self,
3822        geo_uri: String,
3823    ) -> Result<send_message_event::v3::Response, BeaconError> {
3824        self.ensure_room_joined()?;
3825
3826        let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3827
3828        if beacon_info_event.content.is_live() {
3829            let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3830            Ok(self.send(content).await?)
3831        } else {
3832            Err(BeaconError::NotLive)
3833        }
3834    }
3835
3836    /// Store the given `ComposerDraft` in the state store using the current
3837    /// room id and optional thread root id as identifier.
3838    pub async fn save_composer_draft(
3839        &self,
3840        draft: ComposerDraft,
3841        thread_root: Option<&EventId>,
3842    ) -> Result<()> {
3843        self.client
3844            .state_store()
3845            .set_kv_data(
3846                StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
3847                StateStoreDataValue::ComposerDraft(draft),
3848            )
3849            .await?;
3850        Ok(())
3851    }
3852
3853    /// Retrieve the `ComposerDraft` stored in the state store for this room
3854    /// and given thread, if any.
3855    pub async fn load_composer_draft(
3856        &self,
3857        thread_root: Option<&EventId>,
3858    ) -> Result<Option<ComposerDraft>> {
3859        let data = self
3860            .client
3861            .state_store()
3862            .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3863            .await?;
3864        Ok(data.and_then(|d| d.into_composer_draft()))
3865    }
3866
3867    /// Remove the `ComposerDraft` stored in the state store for this room
3868    /// and given thread, if any.
3869    pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
3870        self.client
3871            .state_store()
3872            .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3873            .await?;
3874        Ok(())
3875    }
3876
3877    /// Load pinned state events for a room from the `/state` endpoint in the
3878    /// home server.
3879    pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3880        let response = self
3881            .client
3882            .send(get_state_event_for_key::v3::Request::new(
3883                self.room_id().to_owned(),
3884                StateEventType::RoomPinnedEvents,
3885                "".to_owned(),
3886            ))
3887            .await;
3888
3889        match response {
3890            Ok(response) => Ok(Some(
3891                response
3892                    .into_content()
3893                    .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
3894                    .pinned,
3895            )),
3896            Err(http_error) => match http_error.as_client_api_error() {
3897                Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3898                _ => Err(http_error.into()),
3899            },
3900        }
3901    }
3902
3903    /// Observe live location sharing events for this room.
3904    ///
3905    /// The returned observable will receive the newest event for each sync
3906    /// response that contains an `m.beacon` event.
3907    ///
3908    /// Returns a stream of [`ObservableLiveLocation`] events from other users
3909    /// in the room, excluding the live location events of the room's own user.
3910    pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3911        ObservableLiveLocation::new(&self.client, self.room_id())
3912    }
3913
3914    /// Subscribe to knock requests in this `Room`.
3915    ///
3916    /// The current requests to join the room will be emitted immediately
3917    /// when subscribing.
3918    ///
3919    /// A new set of knock requests will be emitted whenever:
3920    /// - A new member event is received.
3921    /// - A knock request is marked as seen.
3922    /// - A sync is gappy (limited), so room membership information may be
3923    ///   outdated.
3924    ///
3925    /// Returns both a stream of knock requests and a handle for a task that
3926    /// will clean up the seen knock request ids when possible.
3927    pub async fn subscribe_to_knock_requests(
3928        &self,
3929    ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
3930        let this = Arc::new(self.clone());
3931
3932        let room_member_events_observer =
3933            self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3934
3935        let current_seen_ids = self.get_seen_knock_request_ids().await?;
3936        let mut seen_request_ids_stream = self
3937            .seen_knock_request_ids_map
3938            .subscribe()
3939            .await
3940            .map(|values| values.unwrap_or_default());
3941
3942        let mut room_info_stream = self.subscribe_info();
3943
3944        // Spawn a task that will clean up the seen knock request ids when updated room
3945        // members are received
3946        let clear_seen_ids_handle = spawn({
3947            let this = self.clone();
3948            async move {
3949                let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3950                while member_updates_stream.recv().await.is_ok() {
3951                    // If room members were updated, try to remove outdated seen knock request ids
3952                    if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3953                        warn!("Failed to remove seen knock requests: {err}")
3954                    }
3955                }
3956            }
3957        });
3958
3959        let combined_stream = stream! {
3960            // Emit current requests to join
3961            match this.get_current_join_requests(&current_seen_ids).await {
3962                Ok(initial_requests) => yield initial_requests,
3963                Err(err) => warn!("Failed to get initial requests to join: {err}")
3964            }
3965
3966            let mut requests_stream = room_member_events_observer.subscribe();
3967            let mut seen_ids = current_seen_ids.clone();
3968
3969            loop {
3970                // This is equivalent to a combine stream operation, triggering a new emission
3971                // when any of the branches changes
3972                tokio::select! {
3973                    Some((event, _)) = requests_stream.next() => {
3974                        if let Some(event) = event.as_original() {
3975                            // If we can calculate the membership change, try to emit only when needed
3976                            let emit = if event.prev_content().is_some() {
3977                                matches!(event.membership_change(),
3978                                    MembershipChange::Banned |
3979                                    MembershipChange::Knocked |
3980                                    MembershipChange::KnockAccepted |
3981                                    MembershipChange::KnockDenied |
3982                                    MembershipChange::KnockRetracted
3983                                )
3984                            } else {
3985                                // If we can't calculate the membership change, assume we need to
3986                                // emit updated values
3987                                true
3988                            };
3989
3990                            if emit {
3991                                match this.get_current_join_requests(&seen_ids).await {
3992                                    Ok(requests) => yield requests,
3993                                    Err(err) => {
3994                                        warn!("Failed to get updated knock requests on new member event: {err}")
3995                                    }
3996                                }
3997                            }
3998                        }
3999                    }
4000
4001                    Some(new_seen_ids) = seen_request_ids_stream.next() => {
4002                        // Update the current seen ids
4003                        seen_ids = new_seen_ids;
4004
4005                        // If seen requests have changed we need to recalculate
4006                        // all the knock requests
4007                        match this.get_current_join_requests(&seen_ids).await {
4008                            Ok(requests) => yield requests,
4009                            Err(err) => {
4010                                warn!("Failed to get updated knock requests on seen ids changed: {err}")
4011                            }
4012                        }
4013                    }
4014
4015                    Some(room_info) = room_info_stream.next() => {
4016                        // We need to emit new items when we may have missing room members:
4017                        // this usually happens after a gappy (limited) sync
4018                        if !room_info.are_members_synced() {
4019                            match this.get_current_join_requests(&seen_ids).await {
4020                                Ok(requests) => yield requests,
4021                                Err(err) => {
4022                                    warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4023                                }
4024                            }
4025                        }
4026                    }
4027                    // If the streams in all branches are closed, stop the loop
4028                    else => break,
4029                }
4030            }
4031        };
4032
4033        Ok((combined_stream, clear_seen_ids_handle))
4034    }
4035
4036    async fn get_current_join_requests(
4037        &self,
4038        seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4039    ) -> Result<Vec<KnockRequest>> {
4040        Ok(self
4041            .members(RoomMemberships::KNOCK)
4042            .await?
4043            .into_iter()
4044            .filter_map(|member| {
4045                let event_id = member.event().event_id()?;
4046                Some(KnockRequest::new(
4047                    self,
4048                    event_id,
4049                    member.event().timestamp(),
4050                    KnockRequestMemberInfo::from_member(&member),
4051                    seen_request_ids.contains_key(event_id),
4052                ))
4053            })
4054            .collect())
4055    }
4056
4057    /// Access the room settings related to privacy and visibility.
4058    pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4059        RoomPrivacySettings::new(&self.inner, &self.client)
4060    }
4061
4062    /// Retrieve a list of all the threads for the current room.
4063    ///
4064    /// Since this client-server API is paginated, the return type may include a
4065    /// token used to resuming back-pagination into the list of results, in
4066    /// [`ThreadRoots::prev_batch_token`]. This token can be fed back into
4067    /// [`ListThreadsOptions::from`] to continue the pagination
4068    /// from the previous position.
4069    pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4070        let request = opts.into_request(self.room_id());
4071
4072        let response = self.client.send(request).await?;
4073
4074        let push_ctx = self.push_context().await?;
4075        let chunk = join_all(
4076            response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4077        )
4078        .await;
4079
4080        Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4081    }
4082
4083    /// Retrieve a list of relations for the given event, according to the given
4084    /// options.
4085    ///
4086    /// Since this client-server API is paginated, the return type may include a
4087    /// token used to resuming back-pagination into the list of results, in
4088    /// [`Relations::prev_batch_token`]. This token can be fed back into
4089    /// [`RelationsOptions::from`] to continue the pagination from the previous
4090    /// position.
4091    ///
4092    /// **Note**: if [`RelationsOptions::from`] is set for a subsequent request,
4093    /// then it must be used with the same
4094    /// [`RelationsOptions::include_relations`] value as the request that
4095    /// returns the `from` token, otherwise the server behavior is undefined.
4096    pub async fn relations(
4097        &self,
4098        event_id: OwnedEventId,
4099        opts: RelationsOptions,
4100    ) -> Result<Relations> {
4101        opts.send(self, event_id).await
4102    }
4103
4104    /// Search this room's [`RoomIndex`] for query and return at most
4105    /// max_number_of_results results.
4106    #[cfg(feature = "experimental-search")]
4107    pub async fn search(
4108        &self,
4109        query: &str,
4110        max_number_of_results: usize,
4111        pagination_offset: Option<usize>,
4112    ) -> Result<Vec<OwnedEventId>, IndexError> {
4113        let mut search_index_guard = self.client.search_index().lock().await;
4114        search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4115    }
4116
4117    /// Subscribe to a given thread in this room.
4118    ///
4119    /// This will subscribe the user to the thread, so that they will receive
4120    /// notifications for that thread specifically.
4121    ///
4122    /// # Arguments
4123    ///
4124    /// - `thread_root`: The ID of the thread root event to subscribe to.
4125    /// - `automatic`: Whether the subscription was made automatically by a
4126    ///   client, not by manual user choice. If set, must include the latest
4127    ///   event ID that's known in the thread and that is causing the automatic
4128    ///   subscription. If unset (i.e. we're now subscribing manually) and there
4129    ///   was a previous automatic subscription, the subscription will be
4130    ///   overridden to a manual one instead.
4131    ///
4132    /// # Returns
4133    ///
4134    /// - A 404 error if the event isn't known, or isn't a thread root.
4135    /// - An `Ok` result if the subscription was successful, or if the server
4136    ///   skipped an automatic subscription (as the user unsubscribed from the
4137    ///   thread after the event causing the automatic subscription).
4138    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4139    pub async fn subscribe_thread(
4140        &self,
4141        thread_root: OwnedEventId,
4142        automatic: Option<OwnedEventId>,
4143    ) -> Result<()> {
4144        let is_automatic = automatic.is_some();
4145
4146        match self
4147            .client
4148            .send(subscribe_thread::unstable::Request::new(
4149                self.room_id().to_owned(),
4150                thread_root.clone(),
4151                automatic,
4152            ))
4153            .await
4154        {
4155            Ok(_response) => {
4156                trace!("Server acknowledged the thread subscription; saving in db");
4157
4158                // Immediately save the result into the database.
4159                self.client
4160                    .state_store()
4161                    .upsert_thread_subscription(
4162                        self.room_id(),
4163                        &thread_root,
4164                        StoredThreadSubscription {
4165                            status: ThreadSubscriptionStatus::Subscribed {
4166                                automatic: is_automatic,
4167                            },
4168                            bump_stamp: None,
4169                        },
4170                    )
4171                    .await?;
4172
4173                Ok(())
4174            }
4175
4176            Err(err) => {
4177                if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4178                    // In this case: the server indicates that the user unsubscribed *after* the
4179                    // event ID we've used in an automatic subscription; don't
4180                    // save the subscription state in the database, as the
4181                    // previous one should be more correct.
4182                    trace!("Thread subscription skipped: {err}");
4183                    Ok(())
4184                } else {
4185                    // Forward the error to the caller.
4186                    Err(err.into())
4187                }
4188            }
4189        }
4190    }
4191
4192    /// Subscribe to a thread if needed, based on a current subscription to it.
4193    ///
4194    /// This is like [`Self::subscribe_thread`], but it first checks if the user
4195    /// has already subscribed to a thread, so as to minimize sending
4196    /// unnecessary subscriptions which would be ignored by the server.
4197    pub async fn subscribe_thread_if_needed(
4198        &self,
4199        thread_root: &EventId,
4200        automatic: Option<OwnedEventId>,
4201    ) -> Result<()> {
4202        if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4203            // If we have a previous subscription, we should only send the new one if it's
4204            // manual and the previous one was automatic.
4205            if !prev_sub.automatic || automatic.is_some() {
4206                // Either we had already a manual subscription, or we had an automatic one and
4207                // the new one is automatic too: nothing to do!
4208                return Ok(());
4209            }
4210        }
4211        self.subscribe_thread(thread_root.to_owned(), automatic).await
4212    }
4213
4214    /// Unsubscribe from a given thread in this room.
4215    ///
4216    /// # Arguments
4217    ///
4218    /// - `thread_root`: The ID of the thread root event to unsubscribe to.
4219    ///
4220    /// # Returns
4221    ///
4222    /// - An `Ok` result if the unsubscription was successful, or the thread was
4223    ///   already unsubscribed.
4224    /// - A 404 error if the event isn't known, or isn't a thread root.
4225    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4226    pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4227        self.client
4228            .send(unsubscribe_thread::unstable::Request::new(
4229                self.room_id().to_owned(),
4230                thread_root.clone(),
4231            ))
4232            .await?;
4233
4234        trace!("Server acknowledged the thread subscription removal; removed it from db too");
4235
4236        // Immediately save the result into the database.
4237        self.client
4238            .state_store()
4239            .upsert_thread_subscription(
4240                self.room_id(),
4241                &thread_root,
4242                StoredThreadSubscription {
4243                    status: ThreadSubscriptionStatus::Unsubscribed,
4244                    bump_stamp: None,
4245                },
4246            )
4247            .await?;
4248
4249        Ok(())
4250    }
4251
4252    /// Return the current thread subscription for the given thread root in this
4253    /// room.
4254    ///
4255    /// # Arguments
4256    ///
4257    /// - `thread_root`: The ID of the thread root event to get the subscription
4258    ///   for.
4259    ///
4260    /// # Returns
4261    ///
4262    /// - An `Ok` result with `Some(ThreadSubscription)` if we have some
4263    ///   subscription information.
4264    /// - An `Ok` result with `None` if the subscription does not exist, or the
4265    ///   event couldn't be found, or the event isn't a thread.
4266    /// - An error if the request fails for any other reason, such as a network
4267    ///   error.
4268    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4269    pub async fn fetch_thread_subscription(
4270        &self,
4271        thread_root: OwnedEventId,
4272    ) -> Result<Option<ThreadSubscription>> {
4273        let result = self
4274            .client
4275            .send(get_thread_subscription::unstable::Request::new(
4276                self.room_id().to_owned(),
4277                thread_root.clone(),
4278            ))
4279            .await;
4280
4281        let subscription = match result {
4282            Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4283            Err(http_error) => match http_error.as_client_api_error() {
4284                Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4285                _ => return Err(http_error.into()),
4286            },
4287        };
4288
4289        // Keep the database in sync.
4290        if let Some(sub) = &subscription {
4291            self.client
4292                .state_store()
4293                .upsert_thread_subscription(
4294                    self.room_id(),
4295                    &thread_root,
4296                    StoredThreadSubscription {
4297                        status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4298                        bump_stamp: None,
4299                    },
4300                )
4301                .await?;
4302        } else {
4303            // If the subscription was not found, remove it from the database.
4304            self.client
4305                .state_store()
4306                .remove_thread_subscription(self.room_id(), &thread_root)
4307                .await?;
4308        }
4309
4310        Ok(subscription)
4311    }
4312
4313    /// Return the current thread subscription for the given thread root in this
4314    /// room, by getting it from storage if possible, or fetching it from
4315    /// network otherwise.
4316    ///
4317    /// See also [`Self::fetch_thread_subscription`] for the exact semantics of
4318    /// this method.
4319    pub async fn load_or_fetch_thread_subscription(
4320        &self,
4321        thread_root: &EventId,
4322    ) -> Result<Option<ThreadSubscription>> {
4323        // If the thread subscriptions list is outdated, fetch from the server.
4324        if self.client.thread_subscription_catchup().is_outdated() {
4325            return self.fetch_thread_subscription(thread_root.to_owned()).await;
4326        }
4327
4328        // Otherwise, we can rely on the store information.
4329        Ok(self
4330            .client
4331            .state_store()
4332            .load_thread_subscription(self.room_id(), thread_root)
4333            .await
4334            .map(|maybe_sub| {
4335                maybe_sub.and_then(|stored| match stored.status {
4336                    ThreadSubscriptionStatus::Unsubscribed => None,
4337                    ThreadSubscriptionStatus::Subscribed { automatic } => {
4338                        Some(ThreadSubscription { automatic })
4339                    }
4340                })
4341            })?)
4342    }
4343}
4344
4345#[cfg(feature = "e2e-encryption")]
4346impl RoomIdentityProvider for Room {
4347    fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4348        Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4349    }
4350
4351    fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4352        Box::pin(async {
4353            let members = self
4354                .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4355                .await
4356                .unwrap_or_else(|_| Default::default());
4357
4358            let mut ret: Vec<UserIdentity> = Vec::new();
4359            for member in members {
4360                if let Some(i) = self.user_identity(member.user_id()).await {
4361                    ret.push(i);
4362                }
4363            }
4364            ret
4365        })
4366    }
4367
4368    fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4369        Box::pin(async {
4370            self.client
4371                .encryption()
4372                .get_user_identity(user_id)
4373                .await
4374                .unwrap_or(None)
4375                .map(|u| u.underlying_identity())
4376        })
4377    }
4378}
4379
4380/// A wrapper for a weak client and a room id that allows to lazily retrieve a
4381/// room, only when needed.
4382#[derive(Clone, Debug)]
4383pub(crate) struct WeakRoom {
4384    client: WeakClient,
4385    room_id: OwnedRoomId,
4386}
4387
4388impl WeakRoom {
4389    /// Create a new `WeakRoom` given its weak components.
4390    pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4391        Self { client, room_id }
4392    }
4393
4394    /// Attempts to reconstruct the room.
4395    pub fn get(&self) -> Option<Room> {
4396        self.client.get().and_then(|client| client.get_room(&self.room_id))
4397    }
4398
4399    /// The room id for that room.
4400    pub fn room_id(&self) -> &RoomId {
4401        &self.room_id
4402    }
4403}
4404
4405/// Details of the (latest) invite.
4406#[derive(Debug, Clone)]
4407pub struct Invite {
4408    /// Who has been invited.
4409    pub invitee: RoomMember,
4410    /// Who sent the invite.
4411    pub inviter: Option<RoomMember>,
4412}
4413
4414#[derive(Error, Debug)]
4415enum InvitationError {
4416    #[error("No membership event found")]
4417    EventMissing,
4418}
4419
4420/// Receipts to send all at once.
4421#[derive(Debug, Clone, Default)]
4422#[non_exhaustive]
4423pub struct Receipts {
4424    /// Fully-read marker (room account data).
4425    pub fully_read: Option<OwnedEventId>,
4426    /// Read receipt (public ephemeral room event).
4427    pub public_read_receipt: Option<OwnedEventId>,
4428    /// Read receipt (private ephemeral room event).
4429    pub private_read_receipt: Option<OwnedEventId>,
4430}
4431
4432impl Receipts {
4433    /// Create an empty `Receipts`.
4434    pub fn new() -> Self {
4435        Self::default()
4436    }
4437
4438    /// Set the last event the user has read.
4439    ///
4440    /// It means that the user has read all the events before this event.
4441    ///
4442    /// This is a private marker only visible by the user.
4443    ///
4444    /// Note that this is technically not a receipt as it is persisted in the
4445    /// room account data.
4446    pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4447        self.fully_read = event_id.into();
4448        self
4449    }
4450
4451    /// Set the last event presented to the user and forward it to the other
4452    /// users in the room.
4453    ///
4454    /// This is used to reset the unread messages/notification count and
4455    /// advertise to other users the last event that the user has likely seen.
4456    pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4457        self.public_read_receipt = event_id.into();
4458        self
4459    }
4460
4461    /// Set the last event presented to the user and don't forward it.
4462    ///
4463    /// This is used to reset the unread messages/notification count.
4464    pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4465        self.private_read_receipt = event_id.into();
4466        self
4467    }
4468
4469    /// Whether this `Receipts` is empty.
4470    pub fn is_empty(&self) -> bool {
4471        self.fully_read.is_none()
4472            && self.public_read_receipt.is_none()
4473            && self.private_read_receipt.is_none()
4474    }
4475}
4476
4477/// [Parent space](https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships)
4478/// listed by a room, possibly validated by checking the space's state.
4479#[derive(Debug)]
4480pub enum ParentSpace {
4481    /// The room recognizes the given room as its parent, and the parent
4482    /// recognizes it as its child.
4483    Reciprocal(Room),
4484    /// The room recognizes the given room as its parent, but the parent does
4485    /// not recognizes it as its child. However, the author of the
4486    /// `m.space.parent` event in the room has a sufficient power level in the
4487    /// parent to create the child event.
4488    WithPowerlevel(Room),
4489    /// The room recognizes the given room as its parent, but the parent does
4490    /// not recognizes it as its child.
4491    Illegitimate(Room),
4492    /// The room recognizes the given id as its parent room, but we cannot check
4493    /// whether the parent recognizes it as its child.
4494    Unverifiable(OwnedRoomId),
4495}
4496
4497/// The score to rate an inappropriate content.
4498///
4499/// Must be a value between `0`, inoffensive, and `-100`, very offensive.
4500#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4501pub struct ReportedContentScore(i8);
4502
4503impl ReportedContentScore {
4504    /// The smallest value that can be represented by this type.
4505    ///
4506    /// This is for very offensive content.
4507    pub const MIN: Self = Self(-100);
4508
4509    /// The largest value that can be represented by this type.
4510    ///
4511    /// This is for inoffensive content.
4512    pub const MAX: Self = Self(0);
4513
4514    /// Try to create a `ReportedContentScore` from the provided `i8`.
4515    ///
4516    /// Returns `None` if it is smaller than [`ReportedContentScore::MIN`] or
4517    /// larger than [`ReportedContentScore::MAX`] .
4518    ///
4519    /// This is the same as the `TryFrom<i8>` implementation for
4520    /// `ReportedContentScore`, except that it returns an `Option` instead
4521    /// of a `Result`.
4522    pub fn new(value: i8) -> Option<Self> {
4523        value.try_into().ok()
4524    }
4525
4526    /// Create a `ReportedContentScore` from the provided `i8` clamped to the
4527    /// acceptable interval.
4528    ///
4529    /// The given value gets clamped into the closed interval between
4530    /// [`ReportedContentScore::MIN`] and [`ReportedContentScore::MAX`].
4531    pub fn new_saturating(value: i8) -> Self {
4532        if value > Self::MAX {
4533            Self::MAX
4534        } else if value < Self::MIN {
4535            Self::MIN
4536        } else {
4537            Self(value)
4538        }
4539    }
4540
4541    /// The value of this score.
4542    pub fn value(&self) -> i8 {
4543        self.0
4544    }
4545}
4546
4547impl PartialEq<i8> for ReportedContentScore {
4548    fn eq(&self, other: &i8) -> bool {
4549        self.0.eq(other)
4550    }
4551}
4552
4553impl PartialEq<ReportedContentScore> for i8 {
4554    fn eq(&self, other: &ReportedContentScore) -> bool {
4555        self.eq(&other.0)
4556    }
4557}
4558
4559impl PartialOrd<i8> for ReportedContentScore {
4560    fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4561        self.0.partial_cmp(other)
4562    }
4563}
4564
4565impl PartialOrd<ReportedContentScore> for i8 {
4566    fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4567        self.partial_cmp(&other.0)
4568    }
4569}
4570
4571impl From<ReportedContentScore> for Int {
4572    fn from(value: ReportedContentScore) -> Self {
4573        value.0.into()
4574    }
4575}
4576
4577impl TryFrom<i8> for ReportedContentScore {
4578    type Error = TryFromReportedContentScoreError;
4579
4580    fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4581        if value > Self::MAX || value < Self::MIN {
4582            Err(TryFromReportedContentScoreError(()))
4583        } else {
4584            Ok(Self(value))
4585        }
4586    }
4587}
4588
4589impl TryFrom<i16> for ReportedContentScore {
4590    type Error = TryFromReportedContentScoreError;
4591
4592    fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4593        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4594        value.try_into()
4595    }
4596}
4597
4598impl TryFrom<i32> for ReportedContentScore {
4599    type Error = TryFromReportedContentScoreError;
4600
4601    fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4602        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4603        value.try_into()
4604    }
4605}
4606
4607impl TryFrom<i64> for ReportedContentScore {
4608    type Error = TryFromReportedContentScoreError;
4609
4610    fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4611        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4612        value.try_into()
4613    }
4614}
4615
4616impl TryFrom<Int> for ReportedContentScore {
4617    type Error = TryFromReportedContentScoreError;
4618
4619    fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4620        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4621        value.try_into()
4622    }
4623}
4624
4625trait EventSource {
4626    fn get_event(
4627        &self,
4628        event_id: &EventId,
4629    ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4630}
4631
4632impl EventSource for &Room {
4633    async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4634        self.load_or_fetch_event(event_id, None).await
4635    }
4636}
4637
4638/// The error type returned when a checked `ReportedContentScore` conversion
4639/// fails.
4640#[derive(Debug, Clone, Error)]
4641#[error("out of range conversion attempted")]
4642pub struct TryFromReportedContentScoreError(());
4643
4644/// Contains the current user's room member info and the optional room member
4645/// info of the sender of the `m.room.member` event that this info represents.
4646#[derive(Debug)]
4647pub struct RoomMemberWithSenderInfo {
4648    /// The actual room member.
4649    pub room_member: RoomMember,
4650    /// The info of the sender of the event `room_member` is based on, if
4651    /// available.
4652    pub sender_info: Option<RoomMember>,
4653}
4654
4655#[cfg(all(test, not(target_family = "wasm")))]
4656mod tests {
4657    use std::collections::BTreeMap;
4658
4659    use matrix_sdk_base::{ComposerDraft, store::ComposerDraftType};
4660    use matrix_sdk_test::{
4661        JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
4662        event_factory::EventFactory, test_json,
4663    };
4664    use ruma::{
4665        RoomVersionId, event_id,
4666        events::{relation::RelationType, room::member::MembershipState},
4667        int, owned_event_id, room_id, user_id,
4668    };
4669    use wiremock::{
4670        Mock, MockServer, ResponseTemplate,
4671        matchers::{header, method, path_regex},
4672    };
4673
4674    use super::ReportedContentScore;
4675    use crate::{
4676        Client,
4677        config::RequestConfig,
4678        room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4679        test_utils::{
4680            client::mock_matrix_session,
4681            logged_in_client,
4682            mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4683        },
4684    };
4685
4686    #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4687    #[async_test]
4688    async fn test_cache_invalidation_while_encrypt() {
4689        use matrix_sdk_base::store::RoomLoadSettings;
4690        use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4691
4692        let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4693        let session = mock_matrix_session();
4694
4695        let client = Client::builder()
4696            .homeserver_url("http://localhost:1234")
4697            .request_config(RequestConfig::new().disable_retry())
4698            .sqlite_store(&sqlite_path, None)
4699            .build()
4700            .await
4701            .unwrap();
4702        client
4703            .matrix_auth()
4704            .restore_session(session.clone(), RoomLoadSettings::default())
4705            .await
4706            .unwrap();
4707
4708        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4709
4710        // Mock receiving an event to create an internal room.
4711        let server = MockServer::start().await;
4712        {
4713            Mock::given(method("GET"))
4714                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4715                .and(header("authorization", "Bearer 1234"))
4716                .respond_with(
4717                    ResponseTemplate::new(200)
4718                        .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4719                )
4720                .mount(&server)
4721                .await;
4722            let response = SyncResponseBuilder::default()
4723                .add_joined_room(
4724                    JoinedRoomBuilder::default()
4725                        .add_state_event(StateTestEvent::Member)
4726                        .add_state_event(StateTestEvent::PowerLevels)
4727                        .add_state_event(StateTestEvent::Encryption),
4728                )
4729                .build_sync_response();
4730            client.base_client().receive_sync_response(response).await.unwrap();
4731        }
4732
4733        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4734
4735        // Step 1, preshare the room keys.
4736        room.preshare_room_key().await.unwrap();
4737
4738        // Step 2, force lock invalidation by pretending another client obtained the
4739        // lock.
4740        {
4741            let client = Client::builder()
4742                .homeserver_url("http://localhost:1234")
4743                .request_config(RequestConfig::new().disable_retry())
4744                .sqlite_store(&sqlite_path, None)
4745                .build()
4746                .await
4747                .unwrap();
4748            client
4749                .matrix_auth()
4750                .restore_session(session.clone(), RoomLoadSettings::default())
4751                .await
4752                .unwrap();
4753            client
4754                .encryption()
4755                .enable_cross_process_store_lock("client2".to_owned())
4756                .await
4757                .unwrap();
4758
4759            let guard = client.encryption().spin_lock_store(None).await.unwrap();
4760            assert!(guard.is_some());
4761        }
4762
4763        // Step 3, take the crypto-store lock.
4764        let guard = client.encryption().spin_lock_store(None).await.unwrap();
4765        assert!(guard.is_some());
4766
4767        // Step 4, try to encrypt a message.
4768        let olm = client.olm_machine().await;
4769        let olm = olm.as_ref().expect("Olm machine wasn't started");
4770
4771        // Now pretend we're encrypting an event; the olm machine shouldn't rely on
4772        // caching the outgoing session before.
4773        let _encrypted_content = olm
4774            .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4775            .await
4776            .unwrap();
4777    }
4778
4779    #[test]
4780    fn reported_content_score() {
4781        // i8
4782        let score = ReportedContentScore::new(0).unwrap();
4783        assert_eq!(score.value(), 0);
4784        let score = ReportedContentScore::new(-50).unwrap();
4785        assert_eq!(score.value(), -50);
4786        let score = ReportedContentScore::new(-100).unwrap();
4787        assert_eq!(score.value(), -100);
4788        assert_eq!(ReportedContentScore::new(10), None);
4789        assert_eq!(ReportedContentScore::new(-110), None);
4790
4791        let score = ReportedContentScore::new_saturating(0);
4792        assert_eq!(score.value(), 0);
4793        let score = ReportedContentScore::new_saturating(-50);
4794        assert_eq!(score.value(), -50);
4795        let score = ReportedContentScore::new_saturating(-100);
4796        assert_eq!(score.value(), -100);
4797        let score = ReportedContentScore::new_saturating(10);
4798        assert_eq!(score, ReportedContentScore::MAX);
4799        let score = ReportedContentScore::new_saturating(-110);
4800        assert_eq!(score, ReportedContentScore::MIN);
4801
4802        // i16
4803        let score = ReportedContentScore::try_from(0i16).unwrap();
4804        assert_eq!(score.value(), 0);
4805        let score = ReportedContentScore::try_from(-100i16).unwrap();
4806        assert_eq!(score.value(), -100);
4807        ReportedContentScore::try_from(10i16).unwrap_err();
4808        ReportedContentScore::try_from(-110i16).unwrap_err();
4809
4810        // i32
4811        let score = ReportedContentScore::try_from(0i32).unwrap();
4812        assert_eq!(score.value(), 0);
4813        let score = ReportedContentScore::try_from(-100i32).unwrap();
4814        assert_eq!(score.value(), -100);
4815        ReportedContentScore::try_from(10i32).unwrap_err();
4816        ReportedContentScore::try_from(-110i32).unwrap_err();
4817
4818        // i64
4819        let score = ReportedContentScore::try_from(0i64).unwrap();
4820        assert_eq!(score.value(), 0);
4821        let score = ReportedContentScore::try_from(-100i64).unwrap();
4822        assert_eq!(score.value(), -100);
4823        ReportedContentScore::try_from(10i64).unwrap_err();
4824        ReportedContentScore::try_from(-110i64).unwrap_err();
4825
4826        // Int
4827        let score = ReportedContentScore::try_from(int!(0)).unwrap();
4828        assert_eq!(score.value(), 0);
4829        let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4830        assert_eq!(score.value(), -100);
4831        ReportedContentScore::try_from(int!(10)).unwrap_err();
4832        ReportedContentScore::try_from(int!(-110)).unwrap_err();
4833    }
4834
4835    #[async_test]
4836    async fn test_composer_draft() {
4837        use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4838
4839        let client = logged_in_client(None).await;
4840
4841        let response = SyncResponseBuilder::default()
4842            .add_joined_room(JoinedRoomBuilder::default())
4843            .build_sync_response();
4844        client.base_client().receive_sync_response(response).await.unwrap();
4845        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4846
4847        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4848
4849        // Save 2 drafts, one for the room and one for a thread.
4850
4851        let draft = ComposerDraft {
4852            plain_text: "Hello, world!".to_owned(),
4853            html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4854            draft_type: ComposerDraftType::NewMessage,
4855        };
4856
4857        room.save_composer_draft(draft.clone(), None).await.unwrap();
4858
4859        let thread_root = owned_event_id!("$thread_root:b.c");
4860        let thread_draft = ComposerDraft {
4861            plain_text: "Hello, thread!".to_owned(),
4862            html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4863            draft_type: ComposerDraftType::NewMessage,
4864        };
4865
4866        room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4867
4868        // Check that the room draft was saved correctly
4869        assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4870
4871        // Check that the thread draft was saved correctly
4872        assert_eq!(
4873            room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4874            Some(thread_draft.clone())
4875        );
4876
4877        // Clear the room draft
4878        room.clear_composer_draft(None).await.unwrap();
4879        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4880
4881        // Check that the thread one is still there
4882        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4883
4884        // Clear the thread draft as well
4885        room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4886        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4887    }
4888
4889    #[async_test]
4890    async fn test_mark_join_requests_as_seen() {
4891        let server = MatrixMockServer::new().await;
4892        let client = server.client_builder().build().await;
4893        let event_id = event_id!("$a:b.c");
4894        let room_id = room_id!("!a:b.c");
4895        let user_id = user_id!("@alice:b.c");
4896
4897        let f = EventFactory::new().room(room_id);
4898        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4899            f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
4900        ]);
4901        let room = server.sync_room(&client, joined_room_builder).await;
4902
4903        // When loading the initial seen ids, there are none
4904        let seen_ids =
4905            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4906        assert!(seen_ids.is_empty());
4907
4908        // We mark a random event id as seen
4909        room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4910            .await
4911            .expect("Couldn't mark join request as seen");
4912
4913        // Then we can check it was successfully marked as seen
4914        let seen_ids =
4915            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4916        assert_eq!(seen_ids.len(), 1);
4917        assert_eq!(
4918            seen_ids.into_iter().next().expect("No next value"),
4919            (event_id.to_owned(), user_id.to_owned())
4920        )
4921    }
4922
4923    #[async_test]
4924    async fn test_own_room_membership_with_no_own_member_event() {
4925        let server = MatrixMockServer::new().await;
4926        let client = server.client_builder().build().await;
4927        let room_id = room_id!("!a:b.c");
4928
4929        let room = server.sync_joined_room(&client, room_id).await;
4930
4931        // Since there is no member event for the own user, the method fails.
4932        // This should never happen in an actual room.
4933        let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4934        assert!(error.is_some());
4935    }
4936
4937    #[async_test]
4938    async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4939        let server = MatrixMockServer::new().await;
4940        let client = server.client_builder().build().await;
4941        let room_id = room_id!("!a:b.c");
4942        let user_id = user_id!("@example:localhost");
4943
4944        let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
4945        let joined_room_builder =
4946            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
4947        let room = server.sync_room(&client, joined_room_builder).await;
4948
4949        // When we load the membership details
4950        let ret = room
4951            .member_with_sender_info(client.user_id().unwrap())
4952            .await
4953            .expect("Room member info should be available");
4954
4955        // We get the member info for the current user
4956        assert_eq!(ret.room_member.event().user_id(), user_id);
4957
4958        // But there is no info for the sender
4959        assert!(ret.sender_info.is_none());
4960    }
4961
4962    #[async_test]
4963    async fn test_own_room_membership_with_own_member_event_and_own_sender() {
4964        let server = MatrixMockServer::new().await;
4965        let client = server.client_builder().build().await;
4966        let room_id = room_id!("!a:b.c");
4967        let user_id = user_id!("@example:localhost");
4968
4969        let f = EventFactory::new().room(room_id).sender(user_id);
4970        let joined_room_builder =
4971            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
4972        let room = server.sync_room(&client, joined_room_builder).await;
4973
4974        // When we load the membership details
4975        let ret = room
4976            .member_with_sender_info(client.user_id().unwrap())
4977            .await
4978            .expect("Room member info should be available");
4979
4980        // We get the current user's member info
4981        assert_eq!(ret.room_member.event().user_id(), user_id);
4982
4983        // And the sender has the same info, since it's also the current user
4984        assert!(ret.sender_info.is_some());
4985        assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
4986    }
4987
4988    #[async_test]
4989    async fn test_own_room_membership_with_own_member_event_and_known_sender() {
4990        let server = MatrixMockServer::new().await;
4991        let client = server.client_builder().build().await;
4992        let room_id = room_id!("!a:b.c");
4993        let user_id = user_id!("@example:localhost");
4994        let sender_id = user_id!("@alice:b.c");
4995
4996        let f = EventFactory::new().room(room_id).sender(sender_id);
4997        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4998            f.member(user_id).into(),
4999            // The sender info comes from the sync
5000            f.member(sender_id).into(),
5001        ]);
5002        let room = server.sync_room(&client, joined_room_builder).await;
5003
5004        // When we load the membership details
5005        let ret = room
5006            .member_with_sender_info(client.user_id().unwrap())
5007            .await
5008            .expect("Room member info should be available");
5009
5010        // We get the current user's member info
5011        assert_eq!(ret.room_member.event().user_id(), user_id);
5012
5013        // And also the sender info from the events received in the sync
5014        assert!(ret.sender_info.is_some());
5015        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5016    }
5017
5018    #[async_test]
5019    async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5020        let server = MatrixMockServer::new().await;
5021        let client = server.client_builder().build().await;
5022        let room_id = room_id!("!a:b.c");
5023        let user_id = user_id!("@example:localhost");
5024        let sender_id = user_id!("@alice:b.c");
5025
5026        let f = EventFactory::new().room(room_id).sender(sender_id);
5027        let joined_room_builder =
5028            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5029        let room = server.sync_room(&client, joined_room_builder).await;
5030
5031        // We'll receive the member info through the /members endpoint
5032        server
5033            .mock_get_members()
5034            .ok(vec![f.member(sender_id).into_raw()])
5035            .mock_once()
5036            .mount()
5037            .await;
5038
5039        // We get the current user's member info
5040        let ret = room
5041            .member_with_sender_info(client.user_id().unwrap())
5042            .await
5043            .expect("Room member info should be available");
5044
5045        // We get the current user's member info
5046        assert_eq!(ret.room_member.event().user_id(), user_id);
5047
5048        // And also the sender info from the /members endpoint
5049        assert!(ret.sender_info.is_some());
5050        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5051    }
5052
5053    #[async_test]
5054    async fn test_list_threads() {
5055        let server = MatrixMockServer::new().await;
5056        let client = server.client_builder().build().await;
5057
5058        let room_id = room_id!("!a:b.c");
5059        let sender_id = user_id!("@alice:b.c");
5060        let f = EventFactory::new().room(room_id).sender(sender_id);
5061
5062        let eid1 = event_id!("$1");
5063        let eid2 = event_id!("$2");
5064        let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5065        let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5066
5067        server
5068            .mock_room_threads()
5069            .ok(batch1.clone(), Some("prev_batch".to_owned()))
5070            .mock_once()
5071            .mount()
5072            .await;
5073        server
5074            .mock_room_threads()
5075            .match_from("prev_batch")
5076            .ok(batch2, None)
5077            .mock_once()
5078            .mount()
5079            .await;
5080
5081        let room = server.sync_joined_room(&client, room_id).await;
5082        let result =
5083            room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5084        assert_eq!(result.chunk.len(), 1);
5085        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5086        assert!(result.prev_batch_token.is_some());
5087
5088        let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5089        let result = room.list_threads(opts).await.expect("Failed to list threads");
5090        assert_eq!(result.chunk.len(), 1);
5091        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5092        assert!(result.prev_batch_token.is_none());
5093    }
5094
5095    #[async_test]
5096    async fn test_relations() {
5097        let server = MatrixMockServer::new().await;
5098        let client = server.client_builder().build().await;
5099
5100        let room_id = room_id!("!a:b.c");
5101        let sender_id = user_id!("@alice:b.c");
5102        let f = EventFactory::new().room(room_id).sender(sender_id);
5103
5104        let target_event_id = owned_event_id!("$target");
5105        let eid1 = event_id!("$1");
5106        let eid2 = event_id!("$2");
5107        let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5108        let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5109
5110        server
5111            .mock_room_relations()
5112            .match_target_event(target_event_id.clone())
5113            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5114            .mock_once()
5115            .mount()
5116            .await;
5117
5118        server
5119            .mock_room_relations()
5120            .match_target_event(target_event_id.clone())
5121            .match_from("next_batch")
5122            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5123            .mock_once()
5124            .mount()
5125            .await;
5126
5127        let room = server.sync_joined_room(&client, room_id).await;
5128
5129        // Main endpoint: no relation type filtered out.
5130        let mut opts = RelationsOptions {
5131            include_relations: IncludeRelations::AllRelations,
5132            ..Default::default()
5133        };
5134        let result = room
5135            .relations(target_event_id.clone(), opts.clone())
5136            .await
5137            .expect("Failed to list relations the first time");
5138        assert_eq!(result.chunk.len(), 1);
5139        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5140        assert!(result.prev_batch_token.is_none());
5141        assert!(result.next_batch_token.is_some());
5142        assert!(result.recursion_depth.is_none());
5143
5144        opts.from = result.next_batch_token;
5145        let result = room
5146            .relations(target_event_id, opts)
5147            .await
5148            .expect("Failed to list relations the second time");
5149        assert_eq!(result.chunk.len(), 1);
5150        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5151        assert!(result.prev_batch_token.is_none());
5152        assert!(result.next_batch_token.is_none());
5153        assert!(result.recursion_depth.is_none());
5154    }
5155
5156    #[async_test]
5157    async fn test_relations_with_reltype() {
5158        let server = MatrixMockServer::new().await;
5159        let client = server.client_builder().build().await;
5160
5161        let room_id = room_id!("!a:b.c");
5162        let sender_id = user_id!("@alice:b.c");
5163        let f = EventFactory::new().room(room_id).sender(sender_id);
5164
5165        let target_event_id = owned_event_id!("$target");
5166        let eid1 = event_id!("$1");
5167        let eid2 = event_id!("$2");
5168        let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5169        let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5170
5171        server
5172            .mock_room_relations()
5173            .match_target_event(target_event_id.clone())
5174            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5175            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5176            .mock_once()
5177            .mount()
5178            .await;
5179
5180        server
5181            .mock_room_relations()
5182            .match_target_event(target_event_id.clone())
5183            .match_from("next_batch")
5184            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5185            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5186            .mock_once()
5187            .mount()
5188            .await;
5189
5190        let room = server.sync_joined_room(&client, room_id).await;
5191
5192        // Reltype-filtered endpoint, for threads \o/
5193        let mut opts = RelationsOptions {
5194            include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5195            ..Default::default()
5196        };
5197        let result = room
5198            .relations(target_event_id.clone(), opts.clone())
5199            .await
5200            .expect("Failed to list relations the first time");
5201        assert_eq!(result.chunk.len(), 1);
5202        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5203        assert!(result.prev_batch_token.is_none());
5204        assert!(result.next_batch_token.is_some());
5205        assert!(result.recursion_depth.is_none());
5206
5207        opts.from = result.next_batch_token;
5208        let result = room
5209            .relations(target_event_id, opts)
5210            .await
5211            .expect("Failed to list relations the second time");
5212        assert_eq!(result.chunk.len(), 1);
5213        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5214        assert!(result.prev_batch_token.is_none());
5215        assert!(result.next_batch_token.is_none());
5216        assert!(result.recursion_depth.is_none());
5217    }
5218
5219    #[async_test]
5220    async fn test_power_levels_computation() {
5221        let server = MatrixMockServer::new().await;
5222        let client = server.client_builder().build().await;
5223
5224        let room_id = room_id!("!a:b.c");
5225        let sender_id = client.user_id().expect("No session id");
5226        let f = EventFactory::new().room(room_id).sender(sender_id);
5227        let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5228
5229        // Computing the power levels will need these 3 state events:
5230        let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5231        let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5232        let room_member_event = f.member(sender_id).into();
5233
5234        // With only the room member event
5235        let room = server
5236            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5237            .await;
5238        let ctx = room
5239            .push_condition_room_ctx()
5240            .await
5241            .expect("Failed to get push condition context")
5242            .expect("Could not get push condition context");
5243
5244        // The internal power levels couldn't be computed
5245        assert!(ctx.power_levels.is_none());
5246
5247        // Adding the room creation event
5248        let room = server
5249            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5250            .await;
5251        let ctx = room
5252            .push_condition_room_ctx()
5253            .await
5254            .expect("Failed to get push condition context")
5255            .expect("Could not get push condition context");
5256
5257        // The internal power levels still couldn't be computed
5258        assert!(ctx.power_levels.is_none());
5259
5260        // With the room member, room creation and the power levels events
5261        let room = server
5262            .sync_room(
5263                &client,
5264                JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5265            )
5266            .await;
5267        let ctx = room
5268            .push_condition_room_ctx()
5269            .await
5270            .expect("Failed to get push condition context")
5271            .expect("Could not get push condition context");
5272
5273        // The internal power levels can finally be computed
5274        assert!(ctx.power_levels.is_some());
5275    }
5276}