matrix_sdk/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level room API
16
17use std::{
18    borrow::Borrow,
19    collections::{BTreeMap, HashMap},
20    future::Future,
21    ops::Deref,
22    sync::Arc,
23    time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30    StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{IdentityStatusChange, RoomIdentityProvider, UserIdentity};
39pub use matrix_sdk_base::store::StoredThreadSubscription;
40use matrix_sdk_base::{
41    ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
42    StateChanges, StateStoreDataKey, StateStoreDataValue,
43    deserialized_responses::{
44        RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
45    },
46    media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
47    store::{StateStoreExt, ThreadSubscriptionStatus},
48};
49#[cfg(feature = "e2e-encryption")]
50use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
51#[cfg(feature = "e2e-encryption")]
52use matrix_sdk_common::BoxFuture;
53use matrix_sdk_common::{
54    deserialized_responses::TimelineEvent,
55    executor::{JoinHandle, spawn},
56    timeout::timeout,
57};
58#[cfg(feature = "experimental-search")]
59use matrix_sdk_search::error::IndexError;
60#[cfg(feature = "experimental-search")]
61#[cfg(doc)]
62use matrix_sdk_search::index::RoomIndex;
63use mime::Mime;
64use reply::Reply;
65#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
66use ruma::events::AnySyncMessageLikeEvent;
67#[cfg(feature = "experimental-encrypted-state-events")]
68use ruma::events::AnySyncStateEvent;
69#[cfg(feature = "unstable-msc4274")]
70use ruma::events::room::message::GalleryItemType;
71#[cfg(feature = "e2e-encryption")]
72use ruma::events::{
73    AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
74};
75use ruma::{
76    EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
77    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
78    api::client::{
79        config::{set_global_account_data, set_room_account_data},
80        context,
81        error::ErrorKind,
82        filter::LazyLoadOptions,
83        membership::{
84            Invite3pid, ban_user, forget_room, get_member_events,
85            invite_user::{self, v3::InvitationRecipient},
86            kick_user, leave_room, unban_user,
87        },
88        message::send_message_event,
89        read_marker::set_read_marker,
90        receipt::create_receipt,
91        redact::redact_event,
92        room::{get_room_event, report_content, report_room},
93        state::{get_state_event_for_key, send_state_event},
94        tag::{create_tag, delete_tag},
95        threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
96        typing::create_typing_event::{self, v3::Typing},
97    },
98    assign,
99    events::{
100        AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
101        Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
102        RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
103        RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
104        StaticStateEventContent, SyncStateEvent,
105        beacon::BeaconEventContent,
106        beacon_info::BeaconInfoEventContent,
107        direct::DirectEventContent,
108        marked_unread::MarkedUnreadEventContent,
109        receipt::{Receipt, ReceiptThread, ReceiptType},
110        room::{
111            ImageInfo, MediaSource, ThumbnailInfo,
112            avatar::{self, RoomAvatarEventContent},
113            encryption::RoomEncryptionEventContent,
114            history_visibility::HistoryVisibility,
115            member::{MembershipChange, SyncRoomMemberEvent},
116            message::{
117                AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
118                FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent,
119                UnstableAudioDetailsContentBlock, UnstableVoiceContentBlock, VideoInfo,
120                VideoMessageEventContent,
121            },
122            name::RoomNameEventContent,
123            pinned_events::RoomPinnedEventsEventContent,
124            power_levels::{
125                RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
126            },
127            server_acl::RoomServerAclEventContent,
128            topic::RoomTopicEventContent,
129        },
130        space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
131        tag::{TagInfo, TagName},
132        typing::SyncTypingEvent,
133    },
134    int,
135    push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
136    serde::Raw,
137    time::Instant,
138};
139#[cfg(feature = "experimental-encrypted-state-events")]
140use ruma::{
141    events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
142    serde::JsonCastable,
143};
144use serde::de::DeserializeOwned;
145use thiserror::Error;
146use tokio::{join, sync::broadcast};
147use tracing::{debug, error, info, instrument, trace, warn};
148
149use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
150pub use self::{
151    member::{RoomMember, RoomMemberRole},
152    messages::{
153        EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
154        Relations, RelationsOptions, ThreadRoots,
155    },
156};
157#[cfg(doc)]
158use crate::event_cache::EventCache;
159#[cfg(feature = "experimental-encrypted-state-events")]
160use crate::room::futures::{SendRawStateEvent, SendStateEvent};
161use crate::{
162    BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
163    attachment::{AttachmentConfig, AttachmentInfo},
164    client::WeakClient,
165    config::RequestConfig,
166    error::{BeaconError, WrongRoomState},
167    event_cache::{self, EventCacheDropHandles, RoomEventCache},
168    event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
169    live_location_share::ObservableLiveLocation,
170    media::{MediaFormat, MediaRequestParameters},
171    notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
172    room::{
173        knock_requests::{KnockRequest, KnockRequestMemberInfo},
174        power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
175        privacy_settings::RoomPrivacySettings,
176    },
177    sync::RoomUpdate,
178    utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
179};
180#[cfg(feature = "e2e-encryption")]
181use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
182
183pub mod edit;
184pub mod futures;
185pub mod identity_status_changes;
186/// Contains code related to requests to join a room.
187pub mod knock_requests;
188mod member;
189mod messages;
190pub mod power_levels;
191pub mod reply;
192
193pub mod calls;
194
195/// Contains all the functionality for modifying the privacy settings in a room.
196pub mod privacy_settings;
197
198#[cfg(feature = "e2e-encryption")]
199pub(crate) mod shared_room_history;
200
201/// A struct containing methods that are common for Joined, Invited and Left
202/// Rooms
203#[derive(Debug, Clone)]
204pub struct Room {
205    inner: BaseRoom,
206    pub(crate) client: Client,
207}
208
209impl Deref for Room {
210    type Target = BaseRoom;
211
212    fn deref(&self) -> &Self::Target {
213        &self.inner
214    }
215}
216
217const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
218const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
219
220/// A thread subscription, according to the semantics of MSC4306.
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub struct ThreadSubscription {
223    /// Whether the subscription was made automatically by a client, not by
224    /// manual user choice.
225    pub automatic: bool,
226}
227
228/// Context allowing to compute the push actions for a given event.
229#[derive(Debug)]
230pub struct PushContext {
231    /// The Ruma context used to compute the push actions.
232    push_condition_room_ctx: PushConditionRoomCtx,
233
234    /// Push rules for this room, based on the push rules state event, or the
235    /// global server default as defined by [`Ruleset::server_default`].
236    push_rules: Ruleset,
237}
238
239impl PushContext {
240    /// Create a new [`PushContext`] from its inner components.
241    pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
242        Self { push_condition_room_ctx, push_rules }
243    }
244
245    /// Compute the push rules for a given event.
246    pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
247        self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
248    }
249
250    /// Compute the push rules for a given event, with extra logging to help
251    /// debugging.
252    #[doc(hidden)]
253    #[instrument(skip_all)]
254    pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
255        let rules = self
256            .push_rules
257            .iter()
258            .filter_map(|r| {
259                if !r.enabled() {
260                    return None;
261                }
262
263                let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
264
265                let conditions = match r {
266                    AnyPushRuleRef::Override(r) => {
267                        format!("{:?}", r.conditions)
268                    }
269                    AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
270                    AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
271                    AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
272                    AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
273                    _ => "<unknown push rule kind>".to_owned(),
274                };
275
276                Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
277            })
278            .collect::<Vec<_>>()
279            .join("\n");
280        trace!("rules:\n\n{rules}\n\n");
281
282        let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
283
284        if let Some(found) = found {
285            trace!("rule {} matched", found.rule_id());
286            found.actions().to_owned()
287        } else {
288            trace!("no match");
289            Vec::new()
290        }
291    }
292}
293
294macro_rules! make_media_type {
295    ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $formatted_caption: ident, $info: ident, $thumbnail: ident) => {{
296        // If caption is set, use it as body, and filename as the file name; otherwise,
297        // body is the filename, and the filename is not set.
298        // https://github.com/matrix-org/matrix-spec-proposals/blob/main/proposals/2530-body-as-caption.md
299        let (body, filename) = match $caption {
300            Some(caption) => (caption, Some($filename)),
301            None => ($filename, None),
302        };
303
304        let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
305
306        match $content_type.type_() {
307            mime::IMAGE => {
308                let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
309                    mimetype: Some($content_type.as_ref().to_owned()),
310                    thumbnail_source,
311                    thumbnail_info
312                });
313                let content = assign!(ImageMessageEventContent::new(body, $source), {
314                    info: Some(Box::new(info)),
315                    formatted: $formatted_caption,
316                    filename
317                });
318                <$t>::Image(content)
319            }
320
321            mime::AUDIO => {
322                let mut content = assign!(AudioMessageEventContent::new(body, $source), {
323                    formatted: $formatted_caption,
324                    filename
325                });
326
327                if let Some(AttachmentInfo::Voice { audio_info, waveform: Some(waveform_vec) }) =
328                    &$info
329                {
330                    if let Some(duration) = audio_info.duration {
331                        let waveform = waveform_vec.iter().map(|v| (*v).into()).collect();
332                        content.audio =
333                            Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
334                    }
335                    content.voice = Some(UnstableVoiceContentBlock::new());
336                }
337
338                let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
339                audio_info.mimetype = Some($content_type.as_ref().to_owned());
340                let content = content.info(Box::new(audio_info));
341
342                <$t>::Audio(content)
343            }
344
345            mime::VIDEO => {
346                let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
347                    mimetype: Some($content_type.as_ref().to_owned()),
348                    thumbnail_source,
349                    thumbnail_info
350                });
351                let content = assign!(VideoMessageEventContent::new(body, $source), {
352                    info: Some(Box::new(info)),
353                    formatted: $formatted_caption,
354                    filename
355                });
356                <$t>::Video(content)
357            }
358
359            _ => {
360                let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
361                    mimetype: Some($content_type.as_ref().to_owned()),
362                    thumbnail_source,
363                    thumbnail_info
364                });
365                let content = assign!(FileMessageEventContent::new(body, $source), {
366                    info: Some(Box::new(info)),
367                    formatted: $formatted_caption,
368                    filename,
369                });
370                <$t>::File(content)
371            }
372        }
373    }};
374}
375
376impl Room {
377    /// Create a new `Room`
378    ///
379    /// # Arguments
380    /// * `client` - The client used to make requests.
381    ///
382    /// * `room` - The underlying room.
383    pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
384        Self { inner: room, client }
385    }
386
387    /// Leave this room.
388    /// If the room was in [`RoomState::Invited`] state, it'll also be forgotten
389    /// automatically.
390    ///
391    /// Only invited and joined rooms can be left.
392    #[doc(alias = "reject_invitation")]
393    #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
394    async fn leave_impl(&self) -> (Result<()>, &Room) {
395        let state = self.state();
396        if state == RoomState::Left {
397            return (
398                Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
399                    "Joined or Invited",
400                    state,
401                )))),
402                self,
403            );
404        }
405
406        // If the room was in Invited state we should also forget it when declining the
407        // invite.
408        let should_forget = matches!(self.state(), RoomState::Invited);
409
410        let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
411        let response = self.client.send(request).await;
412
413        // The server can return with an error that is acceptable to ignore. Let's find
414        // which one.
415        if let Err(error) = response {
416            #[allow(clippy::collapsible_match)]
417            let ignore_error = if let Some(error) = error.client_api_error_kind() {
418                match error {
419                    // The user is trying to leave a room but doesn't have permissions to do so.
420                    // Let's consider the user has left the room.
421                    ErrorKind::Forbidden { .. } => true,
422                    _ => false,
423                }
424            } else {
425                false
426            };
427
428            error!(?error, ignore_error, should_forget, "Failed to leave the room");
429
430            if !ignore_error {
431                return (Err(error.into()), self);
432            }
433        }
434
435        if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
436            return (Err(e.into()), self);
437        }
438
439        if should_forget {
440            trace!("Trying to forget the room");
441
442            if let Err(error) = self.forget().await {
443                error!(?error, "Failed to forget the room");
444            }
445        }
446
447        (Ok(()), self)
448    }
449
450    /// Leave this room and all predecessors.
451    /// If any room was in [`RoomState::Invited`] state, it'll also be forgotten
452    /// automatically.
453    ///
454    /// Only invited and joined rooms can be left.
455    /// Will return an error if the current room fails to leave but
456    /// will only warn if a predecessor fails to leave.
457    pub async fn leave(&self) -> Result<()> {
458        let mut rooms: Vec<Room> = vec![self.clone()];
459        let mut current_room = self;
460
461        while let Some(predecessor) = current_room.predecessor_room() {
462            let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
463
464            if let Some(predecessor_room) = maybe_predecessor_room {
465                rooms.push(predecessor_room.clone());
466                current_room = rooms.last().expect("Room just pushed so can't be empty");
467            } else {
468                warn!("Cannot find predecessor room");
469                break;
470            }
471        }
472
473        let batch_size = 5;
474
475        let rooms_futures: Vec<_> = rooms
476            .iter()
477            .filter_map(|room| match room.state() {
478                RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
479                    Some(room.leave_impl())
480                }
481                RoomState::Banned | RoomState::Left => None,
482            })
483            .collect();
484
485        let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
486
487        let mut maybe_this_room_failed_with: Option<Error> = None;
488
489        while let Some(result) = futures_stream.next().await {
490            if let (Err(e), room) = result {
491                if room.room_id() == self.room_id() {
492                    maybe_this_room_failed_with = Some(e);
493                } else {
494                    warn!("Failure while attempting to leave predecessor room: {e:?}");
495                }
496            }
497        }
498
499        maybe_this_room_failed_with.map_or(Ok(()), Err)
500    }
501
502    /// Join this room.
503    ///
504    /// Only invited and left rooms can be joined via this method.
505    #[doc(alias = "accept_invitation")]
506    pub async fn join(&self) -> Result<()> {
507        let prev_room_state = self.inner.state();
508
509        if prev_room_state == RoomState::Joined {
510            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
511                "Invited or Left",
512                prev_room_state,
513            ))));
514        }
515
516        self.client.join_room_by_id(self.room_id()).await?;
517
518        Ok(())
519    }
520
521    /// Get the inner client saved in this room instance.
522    ///
523    /// Returns the client this room is part of.
524    pub fn client(&self) -> Client {
525        self.client.clone()
526    }
527
528    /// Get the sync state of this room, i.e. whether it was fully synced with
529    /// the server.
530    pub fn is_synced(&self) -> bool {
531        self.inner.is_state_fully_synced()
532    }
533
534    /// Gets the avatar of this room, if set.
535    ///
536    /// Returns the avatar.
537    /// If a thumbnail is requested no guarantee on the size of the image is
538    /// given.
539    ///
540    /// # Arguments
541    ///
542    /// * `format` - The desired format of the avatar.
543    ///
544    /// # Examples
545    ///
546    /// ```no_run
547    /// # use matrix_sdk::Client;
548    /// # use matrix_sdk::ruma::room_id;
549    /// # use matrix_sdk::media::MediaFormat;
550    /// # use url::Url;
551    /// # let homeserver = Url::parse("http://example.com").unwrap();
552    /// # async {
553    /// # let user = "example";
554    /// let client = Client::new(homeserver).await.unwrap();
555    /// client.matrix_auth().login_username(user, "password").send().await.unwrap();
556    /// let room_id = room_id!("!roomid:example.com");
557    /// let room = client.get_room(&room_id).unwrap();
558    /// if let Some(avatar) = room.avatar(MediaFormat::File).await.unwrap() {
559    ///     std::fs::write("avatar.png", avatar);
560    /// }
561    /// # };
562    /// ```
563    pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
564        let Some(url) = self.avatar_url() else { return Ok(None) };
565        let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
566        Ok(Some(self.client.media().get_media_content(&request, true).await?))
567    }
568
569    /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
570    /// returns a `Messages` struct that contains a chunk of room and state
571    /// events (`RoomEvent` and `AnyStateEvent`).
572    ///
573    /// With the encryption feature, messages are decrypted if possible. If
574    /// decryption fails for an individual message, that message is returned
575    /// undecrypted.
576    ///
577    /// # Examples
578    ///
579    /// ```no_run
580    /// use matrix_sdk::{Client, room::MessagesOptions};
581    /// # use matrix_sdk::ruma::{
582    /// #     api::client::filter::RoomEventFilter,
583    /// #     room_id,
584    /// # };
585    /// # use url::Url;
586    ///
587    /// # let homeserver = Url::parse("http://example.com").unwrap();
588    /// # async {
589    /// let options =
590    ///     MessagesOptions::backward().from("t47429-4392820_219380_26003_2265");
591    ///
592    /// let mut client = Client::new(homeserver).await.unwrap();
593    /// let room = client.get_room(room_id!("!roomid:example.com")).unwrap();
594    /// assert!(room.messages(options).await.is_ok());
595    /// # };
596    /// ```
597    #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
598    pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
599        let room_id = self.inner.room_id();
600        let request = options.into_request(room_id);
601        let http_response = self.client.send(request).await?;
602
603        let push_ctx = self.push_context().await?;
604        let chunk = join_all(
605            http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
606        )
607        .await;
608
609        Ok(Messages {
610            start: http_response.start,
611            end: http_response.end,
612            chunk,
613            state: http_response.state,
614        })
615    }
616
617    /// Register a handler for events of a specific type, within this room.
618    ///
619    /// This method works the same way as [`Client::add_event_handler`], except
620    /// that the handler will only be called for events within this room. See
621    /// that method for more details on event handler functions.
622    ///
623    /// `room.add_event_handler(hdl)` is equivalent to
624    /// `client.add_room_event_handler(room_id, hdl)`. Use whichever one is more
625    /// convenient in your use case.
626    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
627    where
628        Ev: SyncEvent + DeserializeOwned + Send + 'static,
629        H: EventHandler<Ev, Ctx>,
630    {
631        self.client.add_room_event_handler(self.room_id(), handler)
632    }
633
634    /// Subscribe to all updates for this room.
635    ///
636    /// The returned receiver will receive a new message for each sync response
637    /// that contains updates for this room.
638    pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
639        self.client.subscribe_to_room_updates(self.room_id())
640    }
641
642    /// Subscribe to typing notifications for this room.
643    ///
644    /// The returned receiver will receive a new vector of user IDs for each
645    /// sync response that contains 'm.typing' event. The current user ID will
646    /// be filtered out.
647    pub fn subscribe_to_typing_notifications(
648        &self,
649    ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
650        let (sender, receiver) = broadcast::channel(16);
651        let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
652            let own_user_id = self.own_user_id().to_owned();
653            move |event: SyncTypingEvent| async move {
654                // Ignore typing notifications from own user.
655                let typing_user_ids = event
656                    .content
657                    .user_ids
658                    .into_iter()
659                    .filter(|user_id| *user_id != own_user_id)
660                    .collect();
661                // Ignore the result. It can only fail if there are no listeners.
662                let _ = sender.send(typing_user_ids);
663            }
664        });
665        let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
666        (drop_guard, receiver)
667    }
668
669    /// Subscribe to updates about users who are in "pin violation" i.e. their
670    /// identity has changed and the user has not yet acknowledged this.
671    ///
672    /// The returned receiver will receive a new vector of
673    /// [`IdentityStatusChange`] each time a /keys/query response shows a
674    /// changed identity for a member of this room, or a sync shows a change
675    /// to the membership of an affected user. (Changes to the current user are
676    /// not directly included, but some changes to the current user's identity
677    /// can trigger changes to how we see other users' identities, which
678    /// will be included.)
679    ///
680    /// The first item in the stream provides the current state of the room:
681    /// each member of the room who is not in "pinned" or "verified" state will
682    /// be included (except the current user).
683    ///
684    /// If the `changed_to` property of an [`IdentityStatusChange`] is set to
685    /// `PinViolation` then a warning should be displayed to the user. If it is
686    /// set to `Pinned` then no warning should be displayed.
687    ///
688    /// Note that if a user who is in pin violation leaves the room, a `Pinned`
689    /// update is sent, to indicate that the warning should be removed, even
690    /// though the user's identity is not necessarily pinned.
691    #[cfg(feature = "e2e-encryption")]
692    pub async fn subscribe_to_identity_status_changes(
693        &self,
694    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
695        IdentityStatusChanges::create_stream(self.clone()).await
696    }
697
698    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
699    /// decrypted if needs be.
700    ///
701    /// Only logs from the crypto crate will indicate a failure to decrypt.
702    #[cfg(not(feature = "experimental-encrypted-state-events"))]
703    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
704    async fn try_decrypt_event(
705        &self,
706        event: Raw<AnyTimelineEvent>,
707        push_ctx: Option<&PushContext>,
708    ) -> TimelineEvent {
709        #[cfg(feature = "e2e-encryption")]
710        if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
711            SyncMessageLikeEvent::Original(_),
712        ))) = event.deserialize_as::<AnySyncTimelineEvent>()
713            && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
714        {
715            return event;
716        }
717
718        let mut event = TimelineEvent::from_plaintext(event.cast());
719        if let Some(push_ctx) = push_ctx {
720            event.set_push_actions(push_ctx.for_event(event.raw()).await);
721        }
722
723        event
724    }
725
726    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
727    /// decrypted if needs be.
728    ///
729    /// Only logs from the crypto crate will indicate a failure to decrypt.
730    #[cfg(feature = "experimental-encrypted-state-events")]
731    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
732    async fn try_decrypt_event(
733        &self,
734        event: Raw<AnyTimelineEvent>,
735        push_ctx: Option<&PushContext>,
736    ) -> TimelineEvent {
737        // If we have either an encrypted message-like or state event, try to decrypt.
738        match event.deserialize_as::<AnySyncTimelineEvent>() {
739            Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
740                SyncMessageLikeEvent::Original(_),
741            ))) => {
742                if let Ok(event) = self
743                    .decrypt_event(
744                        event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
745                        push_ctx,
746                    )
747                    .await
748                {
749                    return event;
750                }
751            }
752            Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
753                SyncStateEvent::Original(_),
754            ))) => {
755                if let Ok(event) = self
756                    .decrypt_event(
757                        event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
758                        push_ctx,
759                    )
760                    .await
761                {
762                    return event;
763                }
764            }
765            _ => {}
766        }
767
768        let mut event = TimelineEvent::from_plaintext(event.cast());
769        if let Some(push_ctx) = push_ctx {
770            event.set_push_actions(push_ctx.for_event(event.raw()).await);
771        }
772
773        event
774    }
775
776    /// Fetch the event with the given `EventId` in this room.
777    ///
778    /// It uses the given [`RequestConfig`] if provided, or the client's default
779    /// one otherwise.
780    pub async fn event(
781        &self,
782        event_id: &EventId,
783        request_config: Option<RequestConfig>,
784    ) -> Result<TimelineEvent> {
785        let request =
786            get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
787
788        let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
789        let push_ctx = self.push_context().await?;
790        let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
791
792        // Save the event into the event cache, if it's set up.
793        if let Ok((cache, _handles)) = self.event_cache().await {
794            cache.save_events([event.clone()]).await;
795        }
796
797        Ok(event)
798    }
799
800    /// Try to load the event from the [`EventCache`][crate::event_cache], if
801    /// it's enabled, or fetch it from the homeserver.
802    ///
803    /// When running the request against the homeserver, it uses the given
804    /// [`RequestConfig`] if provided, or the client's default one
805    /// otherwise.
806    pub async fn load_or_fetch_event(
807        &self,
808        event_id: &EventId,
809        request_config: Option<RequestConfig>,
810    ) -> Result<TimelineEvent> {
811        match self.event_cache().await {
812            Ok((event_cache, _drop_handles)) => {
813                if let Some(event) = event_cache.find_event(event_id).await {
814                    return Ok(event);
815                }
816                // Fallthrough: try with a request.
817            }
818            Err(err) => {
819                debug!("error when getting the event cache: {err}");
820            }
821        }
822        self.event(event_id, request_config).await
823    }
824
825    /// Fetch the event with the given `EventId` in this room, using the
826    /// `/context` endpoint to get more information.
827    pub async fn event_with_context(
828        &self,
829        event_id: &EventId,
830        lazy_load_members: bool,
831        context_size: UInt,
832        request_config: Option<RequestConfig>,
833    ) -> Result<EventWithContextResponse> {
834        let mut request =
835            context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
836
837        request.limit = context_size;
838
839        if lazy_load_members {
840            request.filter.lazy_load_options =
841                LazyLoadOptions::Enabled { include_redundant_members: false };
842        }
843
844        let response = self.client.send(request).with_request_config(request_config).await?;
845
846        let push_ctx = self.push_context().await?;
847        let push_ctx = push_ctx.as_ref();
848        let target_event = if let Some(event) = response.event {
849            Some(self.try_decrypt_event(event, push_ctx).await)
850        } else {
851            None
852        };
853
854        // Note: the joined future will fail if any future failed, but
855        // [`Self::try_decrypt_event`] doesn't hard-fail when there's a
856        // decryption error, so we should prevent against most bad cases here.
857        let (events_before, events_after) = join!(
858            join_all(
859                response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
860            ),
861            join_all(
862                response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
863            ),
864        );
865
866        // Save the loaded events into the event cache, if it's set up.
867        if let Ok((cache, _handles)) = self.event_cache().await {
868            let mut events_to_save: Vec<TimelineEvent> = Vec::new();
869            if let Some(event) = &target_event {
870                events_to_save.push(event.clone());
871            }
872
873            for event in &events_before {
874                events_to_save.push(event.clone());
875            }
876
877            for event in &events_after {
878                events_to_save.push(event.clone());
879            }
880
881            cache.save_events(events_to_save).await;
882        }
883
884        Ok(EventWithContextResponse {
885            event: target_event,
886            events_before,
887            events_after,
888            state: response.state,
889            prev_batch_token: response.start,
890            next_batch_token: response.end,
891        })
892    }
893
894    pub(crate) async fn request_members(&self) -> Result<()> {
895        self.client
896            .locks()
897            .members_request_deduplicated_handler
898            .run(self.room_id().to_owned(), async move {
899                let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
900                let response = self
901                    .client
902                    .send(request.clone())
903                    .with_request_config(
904                        // In some cases it can take longer than 30s to load:
905                        // https://github.com/element-hq/synapse/issues/16872
906                        RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
907                    )
908                    .await?;
909
910                // That's a large `Future`. Let's `Box::pin` to reduce its size on the stack.
911                Box::pin(self.client.base_client().receive_all_members(
912                    self.room_id(),
913                    &request,
914                    &response,
915                ))
916                .await?;
917
918                Ok(())
919            })
920            .await
921    }
922
923    /// Request to update the encryption state for this room.
924    ///
925    /// It does nothing if the encryption state is already
926    /// [`EncryptionState::Encrypted`] or [`EncryptionState::NotEncrypted`].
927    pub async fn request_encryption_state(&self) -> Result<()> {
928        if !self.inner.encryption_state().is_unknown() {
929            return Ok(());
930        }
931
932        self.client
933            .locks()
934            .encryption_state_deduplicated_handler
935            .run(self.room_id().to_owned(), async move {
936                // Request the event from the server.
937                let request = get_state_event_for_key::v3::Request::new(
938                    self.room_id().to_owned(),
939                    StateEventType::RoomEncryption,
940                    "".to_owned(),
941                );
942                let response = match self.client.send(request).await {
943                    Ok(response) => Some(
944                        response
945                            .into_content()
946                            .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
947                    ),
948                    Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
949                    Err(err) => return Err(err.into()),
950                };
951
952                let _sync_lock = self.client.base_client().sync_lock().lock().await;
953
954                // Persist the event and the fact that we requested it from the server in
955                // `RoomInfo`.
956                let mut room_info = self.clone_info();
957                room_info.mark_encryption_state_synced();
958                room_info.set_encryption_event(response.clone());
959                let mut changes = StateChanges::default();
960                changes.add_room(room_info.clone());
961
962                self.client.state_store().save_changes(&changes).await?;
963                self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
964
965                Ok(())
966            })
967            .await
968    }
969
970    /// Check the encryption state of this room.
971    ///
972    /// If the result is [`EncryptionState::Unknown`], one might want to call
973    /// [`Room::request_encryption_state`].
974    pub fn encryption_state(&self) -> EncryptionState {
975        self.inner.encryption_state()
976    }
977
978    /// Force to update the encryption state by calling
979    /// [`Room::request_encryption_state`], and then calling
980    /// [`Room::encryption_state`].
981    ///
982    /// This method is useful to ensure the encryption state is up-to-date.
983    pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
984        self.request_encryption_state().await?;
985
986        Ok(self.encryption_state())
987    }
988
989    /// Gets additional context info about the client crypto.
990    #[cfg(feature = "e2e-encryption")]
991    pub async fn crypto_context_info(&self) -> CryptoContextInfo {
992        let encryption = self.client.encryption();
993
994        let this_device_is_verified = match encryption.get_own_device().await {
995            Ok(Some(device)) => device.is_verified_with_cross_signing(),
996
997            // Should not happen, there will always be an own device
998            _ => true,
999        };
1000
1001        let backup_exists_on_server =
1002            encryption.backups().exists_on_server().await.unwrap_or(false);
1003
1004        CryptoContextInfo {
1005            device_creation_ts: encryption.device_creation_timestamp().await,
1006            this_device_is_verified,
1007            is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1008            backup_exists_on_server,
1009        }
1010    }
1011
1012    fn are_events_visible(&self) -> bool {
1013        if let RoomState::Invited = self.inner.state() {
1014            return matches!(
1015                self.inner.history_visibility_or_default(),
1016                HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1017            );
1018        }
1019
1020        true
1021    }
1022
1023    /// Sync the member list with the server.
1024    ///
1025    /// This method will de-duplicate requests if it is called multiple times in
1026    /// quick succession, in that case the return value will be `None`. This
1027    /// method does nothing if the members are already synced.
1028    pub async fn sync_members(&self) -> Result<()> {
1029        if !self.are_events_visible() {
1030            return Ok(());
1031        }
1032
1033        if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1034    }
1035
1036    /// Get a specific member of this room.
1037    ///
1038    /// *Note*: This method will fetch the members from the homeserver if the
1039    /// member list isn't synchronized due to member lazy loading. Because of
1040    /// that it might panic if it isn't run on a tokio thread.
1041    ///
1042    /// Use [get_member_no_sync()](#method.get_member_no_sync) if you want a
1043    /// method that doesn't do any requests.
1044    ///
1045    /// # Arguments
1046    ///
1047    /// * `user_id` - The ID of the user that should be fetched out of the
1048    ///   store.
1049    pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1050        self.sync_members().await?;
1051        self.get_member_no_sync(user_id).await
1052    }
1053
1054    /// Get a specific member of this room.
1055    ///
1056    /// *Note*: This method will not fetch the members from the homeserver if
1057    /// the member list isn't synchronized due to member lazy loading. Thus,
1058    /// members could be missing.
1059    ///
1060    /// Use [get_member()](#method.get_member) if you want to ensure to always
1061    /// have the full member list to chose from.
1062    ///
1063    /// # Arguments
1064    ///
1065    /// * `user_id` - The ID of the user that should be fetched out of the
1066    ///   store.
1067    pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1068        Ok(self
1069            .inner
1070            .get_member(user_id)
1071            .await?
1072            .map(|member| RoomMember::new(self.client.clone(), member)))
1073    }
1074
1075    /// Get members for this room, with the given memberships.
1076    ///
1077    /// *Note*: This method will fetch the members from the homeserver if the
1078    /// member list isn't synchronized due to member lazy loading. Because of
1079    /// that it might panic if it isn't run on a tokio thread.
1080    ///
1081    /// Use [members_no_sync()](#method.members_no_sync) if you want a
1082    /// method that doesn't do any requests.
1083    pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1084        self.sync_members().await?;
1085        self.members_no_sync(memberships).await
1086    }
1087
1088    /// Get members for this room, with the given memberships.
1089    ///
1090    /// *Note*: This method will not fetch the members from the homeserver if
1091    /// the member list isn't synchronized due to member lazy loading. Thus,
1092    /// members could be missing.
1093    ///
1094    /// Use [members()](#method.members) if you want to ensure to always get
1095    /// the full member list.
1096    pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1097        Ok(self
1098            .inner
1099            .members(memberships)
1100            .await?
1101            .into_iter()
1102            .map(|member| RoomMember::new(self.client.clone(), member))
1103            .collect())
1104    }
1105
1106    /// Get all state events of a given type in this room.
1107    pub async fn get_state_events(
1108        &self,
1109        event_type: StateEventType,
1110    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1111        self.client
1112            .state_store()
1113            .get_state_events(self.room_id(), event_type)
1114            .await
1115            .map_err(Into::into)
1116    }
1117
1118    /// Get all state events of a given statically-known type in this room.
1119    ///
1120    /// # Examples
1121    ///
1122    /// ```no_run
1123    /// # async {
1124    /// # let room: matrix_sdk::Room = todo!();
1125    /// use matrix_sdk::ruma::{
1126    ///     events::room::member::RoomMemberEventContent, serde::Raw,
1127    /// };
1128    ///
1129    /// let room_members =
1130    ///     room.get_state_events_static::<RoomMemberEventContent>().await?;
1131    /// # anyhow::Ok(())
1132    /// # };
1133    /// ```
1134    pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1135    where
1136        C: StaticEventContent<IsPrefix = ruma::events::False>
1137            + StaticStateEventContent
1138            + RedactContent,
1139        C::Redacted: RedactedStateEventContent,
1140    {
1141        Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1142    }
1143
1144    /// Get the state events of a given type with the given state keys in this
1145    /// room.
1146    pub async fn get_state_events_for_keys(
1147        &self,
1148        event_type: StateEventType,
1149        state_keys: &[&str],
1150    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1151        self.client
1152            .state_store()
1153            .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1154            .await
1155            .map_err(Into::into)
1156    }
1157
1158    /// Get the state events of a given statically-known type with the given
1159    /// state keys in this room.
1160    ///
1161    /// # Examples
1162    ///
1163    /// ```no_run
1164    /// # async {
1165    /// # let room: matrix_sdk::Room = todo!();
1166    /// # let user_ids: &[matrix_sdk::ruma::OwnedUserId] = &[];
1167    /// use matrix_sdk::ruma::events::room::member::RoomMemberEventContent;
1168    ///
1169    /// let room_members = room
1170    ///     .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
1171    ///         user_ids,
1172    ///     )
1173    ///     .await?;
1174    /// # anyhow::Ok(())
1175    /// # };
1176    /// ```
1177    pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1178        &self,
1179        state_keys: I,
1180    ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1181    where
1182        C: StaticEventContent<IsPrefix = ruma::events::False>
1183            + StaticStateEventContent
1184            + RedactContent,
1185        C::StateKey: Borrow<K>,
1186        C::Redacted: RedactedStateEventContent,
1187        K: AsRef<str> + Sized + Sync + 'a,
1188        I: IntoIterator<Item = &'a K> + Send,
1189        I::IntoIter: Send,
1190    {
1191        Ok(self
1192            .client
1193            .state_store()
1194            .get_state_events_for_keys_static(self.room_id(), state_keys)
1195            .await?)
1196    }
1197
1198    /// Get a specific state event in this room.
1199    pub async fn get_state_event(
1200        &self,
1201        event_type: StateEventType,
1202        state_key: &str,
1203    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1204        self.client
1205            .state_store()
1206            .get_state_event(self.room_id(), event_type, state_key)
1207            .await
1208            .map_err(Into::into)
1209    }
1210
1211    /// Get a specific state event of statically-known type with an empty state
1212    /// key in this room.
1213    ///
1214    /// # Examples
1215    ///
1216    /// ```no_run
1217    /// # async {
1218    /// # let room: matrix_sdk::Room = todo!();
1219    /// use matrix_sdk::ruma::events::room::power_levels::RoomPowerLevelsEventContent;
1220    ///
1221    /// let power_levels = room
1222    ///     .get_state_event_static::<RoomPowerLevelsEventContent>()
1223    ///     .await?
1224    ///     .expect("every room has a power_levels event")
1225    ///     .deserialize()?;
1226    /// # anyhow::Ok(())
1227    /// # };
1228    /// ```
1229    pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1230    where
1231        C: StaticEventContent<IsPrefix = ruma::events::False>
1232            + StaticStateEventContent<StateKey = EmptyStateKey>
1233            + RedactContent,
1234        C::Redacted: RedactedStateEventContent,
1235    {
1236        self.get_state_event_static_for_key(&EmptyStateKey).await
1237    }
1238
1239    /// Get a specific state event of statically-known type in this room.
1240    ///
1241    /// # Examples
1242    ///
1243    /// ```no_run
1244    /// # async {
1245    /// # let room: matrix_sdk::Room = todo!();
1246    /// use matrix_sdk::ruma::{
1247    ///     events::room::member::RoomMemberEventContent, serde::Raw, user_id,
1248    /// };
1249    ///
1250    /// let member_event = room
1251    ///     .get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id!(
1252    ///         "@alice:example.org"
1253    ///     ))
1254    ///     .await?;
1255    /// # anyhow::Ok(())
1256    /// # };
1257    /// ```
1258    pub async fn get_state_event_static_for_key<C, K>(
1259        &self,
1260        state_key: &K,
1261    ) -> Result<Option<RawSyncOrStrippedState<C>>>
1262    where
1263        C: StaticEventContent<IsPrefix = ruma::events::False>
1264            + StaticStateEventContent
1265            + RedactContent,
1266        C::StateKey: Borrow<K>,
1267        C::Redacted: RedactedStateEventContent,
1268        K: AsRef<str> + ?Sized + Sync,
1269    {
1270        Ok(self
1271            .client
1272            .state_store()
1273            .get_state_event_static_for_key(self.room_id(), state_key)
1274            .await?)
1275    }
1276
1277    /// Returns the parents this room advertises as its parents.
1278    ///
1279    /// Results are in no particular order.
1280    pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1281        // Implements this algorithm:
1282        // https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships
1283
1284        // Get all m.space.parent events for this room
1285        Ok(self
1286            .get_state_events_static::<SpaceParentEventContent>()
1287            .await?
1288            .into_iter()
1289            // Extract state key (ie. the parent's id) and sender
1290            .filter_map(|parent_event| match parent_event.deserialize() {
1291                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1292                    Some((e.state_key.to_owned(), e.sender))
1293                }
1294                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1295                Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1296                Err(e) => {
1297                    info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1298                    None
1299                }
1300            })
1301            // Check whether the parent recognizes this room as its child
1302            .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1303                let Some(parent_room) = self.client.get_room(&state_key) else {
1304                    // We are not in the room, cannot check if the relationship is reciprocal
1305                    // TODO: try peeking into the room
1306                    return Ok(ParentSpace::Unverifiable(state_key));
1307                };
1308                // Get the m.space.child state of the parent with this room's id
1309                // as state key.
1310                if let Some(child_event) = parent_room
1311                    .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1312                    .await?
1313                {
1314                    match child_event.deserialize() {
1315                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1316                            // There is a valid m.space.child in the parent pointing to
1317                            // this room
1318                            return Ok(ParentSpace::Reciprocal(parent_room));
1319                        }
1320                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1321                        Ok(SyncOrStrippedState::Stripped(_)) => {}
1322                        Err(e) => {
1323                            info!(
1324                                room_id = ?self.room_id(), parent_room_id = ?state_key,
1325                                "Could not deserialize m.space.child: {e}"
1326                            );
1327                        }
1328                    }
1329                    // Otherwise the event is either invalid or redacted. If
1330                    // redacted it would be missing the
1331                    // `via` key, thereby invalidating that end of the
1332                    // relationship: https://spec.matrix.org/v1.8/client-server-api/#mspacechild
1333                }
1334
1335                // No reciprocal m.space.child found, let's check if the sender has the
1336                // power to set it
1337                let Some(member) = parent_room.get_member(&sender).await? else {
1338                    // Sender is not even in the parent room
1339                    return Ok(ParentSpace::Illegitimate(parent_room));
1340                };
1341
1342                if member.can_send_state(StateEventType::SpaceChild) {
1343                    // Sender does have the power to set m.room.child
1344                    Ok(ParentSpace::WithPowerlevel(parent_room))
1345                } else {
1346                    Ok(ParentSpace::Illegitimate(parent_room))
1347                }
1348            })
1349            .collect::<FuturesUnordered<_>>())
1350    }
1351
1352    /// Read account data in this room, from storage.
1353    pub async fn account_data(
1354        &self,
1355        data_type: RoomAccountDataEventType,
1356    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1357        self.client
1358            .state_store()
1359            .get_room_account_data_event(self.room_id(), data_type)
1360            .await
1361            .map_err(Into::into)
1362    }
1363
1364    /// Get account data of a statically-known type in this room, from storage.
1365    ///
1366    /// # Examples
1367    ///
1368    /// ```no_run
1369    /// # async {
1370    /// # let room: matrix_sdk::Room = todo!();
1371    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1372    ///
1373    /// match room.account_data_static::<FullyReadEventContent>().await? {
1374    ///     Some(fully_read) => {
1375    ///         println!("Found read marker: {:?}", fully_read.deserialize()?)
1376    ///     }
1377    ///     None => println!("No read marker for this room"),
1378    /// }
1379    /// # anyhow::Ok(())
1380    /// # };
1381    /// ```
1382    pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1383    where
1384        C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1385    {
1386        Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1387    }
1388
1389    /// Check if all members of this room are verified and all their devices are
1390    /// verified.
1391    ///
1392    /// Returns true if all devices in the room are verified, otherwise false.
1393    #[cfg(feature = "e2e-encryption")]
1394    pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1395        let user_ids = self
1396            .client
1397            .state_store()
1398            .get_user_ids(self.room_id(), RoomMemberships::empty())
1399            .await?;
1400
1401        for user_id in user_ids {
1402            let devices = self.client.encryption().get_user_devices(&user_id).await?;
1403            let any_unverified = devices.devices().any(|d| !d.is_verified());
1404
1405            if any_unverified {
1406                return Ok(false);
1407            }
1408        }
1409
1410        Ok(true)
1411    }
1412
1413    /// Set the given account data event for this room.
1414    ///
1415    /// # Example
1416    /// ```
1417    /// # async {
1418    /// # let room: matrix_sdk::Room = todo!();
1419    /// # let event_id: ruma::OwnedEventId = todo!();
1420    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1421    /// let content = FullyReadEventContent::new(event_id);
1422    ///
1423    /// room.set_account_data(content).await?;
1424    /// # anyhow::Ok(())
1425    /// # };
1426    /// ```
1427    pub async fn set_account_data<T>(
1428        &self,
1429        content: T,
1430    ) -> Result<set_room_account_data::v3::Response>
1431    where
1432        T: RoomAccountDataEventContent,
1433    {
1434        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1435
1436        let request = set_room_account_data::v3::Request::new(
1437            own_user.to_owned(),
1438            self.room_id().to_owned(),
1439            &content,
1440        )?;
1441
1442        Ok(self.client.send(request).await?)
1443    }
1444
1445    /// Set the given raw account data event in this room.
1446    ///
1447    /// # Example
1448    /// ```
1449    /// # async {
1450    /// # let room: matrix_sdk::Room = todo!();
1451    /// use matrix_sdk::ruma::{
1452    ///     events::{
1453    ///         AnyRoomAccountDataEventContent, RoomAccountDataEventContent,
1454    ///         marked_unread::MarkedUnreadEventContent,
1455    ///     },
1456    ///     serde::Raw,
1457    /// };
1458    /// let marked_unread_content = MarkedUnreadEventContent::new(true);
1459    /// let full_event: AnyRoomAccountDataEventContent =
1460    ///     marked_unread_content.clone().into();
1461    /// room.set_account_data_raw(
1462    ///     marked_unread_content.event_type(),
1463    ///     Raw::new(&full_event).unwrap(),
1464    /// )
1465    /// .await?;
1466    /// # anyhow::Ok(())
1467    /// # };
1468    /// ```
1469    pub async fn set_account_data_raw(
1470        &self,
1471        event_type: RoomAccountDataEventType,
1472        content: Raw<AnyRoomAccountDataEventContent>,
1473    ) -> Result<set_room_account_data::v3::Response> {
1474        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1475
1476        let request = set_room_account_data::v3::Request::new_raw(
1477            own_user.to_owned(),
1478            self.room_id().to_owned(),
1479            event_type,
1480            content,
1481        );
1482
1483        Ok(self.client.send(request).await?)
1484    }
1485
1486    /// Adds a tag to the room, or updates it if it already exists.
1487    ///
1488    /// Returns the [`create_tag::v3::Response`] from the server.
1489    ///
1490    /// # Arguments
1491    /// * `tag` - The tag to add or update.
1492    ///
1493    /// * `tag_info` - Information about the tag, generally containing the
1494    ///   `order` parameter.
1495    ///
1496    /// # Examples
1497    ///
1498    /// ```no_run
1499    /// # use std::str::FromStr;
1500    /// # use ruma::events::tag::{TagInfo, TagName, UserTagName};
1501    /// # async {
1502    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
1503    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
1504    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
1505    /// use matrix_sdk::ruma::events::tag::TagInfo;
1506    ///
1507    /// if let Some(room) = client.get_room(&room_id) {
1508    ///     let mut tag_info = TagInfo::new();
1509    ///     tag_info.order = Some(0.9);
1510    ///     let user_tag = UserTagName::from_str("u.work")?;
1511    ///
1512    ///     room.set_tag(TagName::User(user_tag), tag_info).await?;
1513    /// }
1514    /// # anyhow::Ok(()) };
1515    /// ```
1516    pub async fn set_tag(
1517        &self,
1518        tag: TagName,
1519        tag_info: TagInfo,
1520    ) -> Result<create_tag::v3::Response> {
1521        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1522        let request = create_tag::v3::Request::new(
1523            user_id.to_owned(),
1524            self.inner.room_id().to_owned(),
1525            tag.to_string(),
1526            tag_info,
1527        );
1528        Ok(self.client.send(request).await?)
1529    }
1530
1531    /// Removes a tag from the room.
1532    ///
1533    /// Returns the [`delete_tag::v3::Response`] from the server.
1534    ///
1535    /// # Arguments
1536    /// * `tag` - The tag to remove.
1537    pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1538        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1539        let request = delete_tag::v3::Request::new(
1540            user_id.to_owned(),
1541            self.inner.room_id().to_owned(),
1542            tag.to_string(),
1543        );
1544        Ok(self.client.send(request).await?)
1545    }
1546
1547    /// Add or remove the `m.favourite` flag for this room.
1548    ///
1549    /// If `is_favourite` is `true`, and the `m.low_priority` tag is set on the
1550    /// room, the tag will be removed too.
1551    ///
1552    /// # Arguments
1553    ///
1554    /// * `is_favourite` - Whether to mark this room as favourite.
1555    /// * `tag_order` - The order of the tag if any.
1556    pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1557        if is_favourite {
1558            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1559
1560            self.set_tag(TagName::Favorite, tag_info).await?;
1561
1562            if self.is_low_priority() {
1563                self.remove_tag(TagName::LowPriority).await?;
1564            }
1565        } else {
1566            self.remove_tag(TagName::Favorite).await?;
1567        }
1568        Ok(())
1569    }
1570
1571    /// Add or remove the `m.lowpriority` flag for this room.
1572    ///
1573    /// If `is_low_priority` is `true`, and the `m.favourite` tag is set on the
1574    /// room, the tag will be removed too.
1575    ///
1576    /// # Arguments
1577    ///
1578    /// * `is_low_priority` - Whether to mark this room as low_priority or not.
1579    /// * `tag_order` - The order of the tag if any.
1580    pub async fn set_is_low_priority(
1581        &self,
1582        is_low_priority: bool,
1583        tag_order: Option<f64>,
1584    ) -> Result<()> {
1585        if is_low_priority {
1586            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1587
1588            self.set_tag(TagName::LowPriority, tag_info).await?;
1589
1590            if self.is_favourite() {
1591                self.remove_tag(TagName::Favorite).await?;
1592            }
1593        } else {
1594            self.remove_tag(TagName::LowPriority).await?;
1595        }
1596        Ok(())
1597    }
1598
1599    /// Sets whether this room is a DM.
1600    ///
1601    /// When setting this room as DM, it will be marked as DM for all active
1602    /// members of the room. When unsetting this room as DM, it will be
1603    /// unmarked as DM for all users, not just the members.
1604    ///
1605    /// # Arguments
1606    /// * `is_direct` - Whether to mark this room as direct.
1607    pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1608        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1609
1610        let mut content = self
1611            .client
1612            .account()
1613            .account_data::<DirectEventContent>()
1614            .await?
1615            .map(|c| c.deserialize())
1616            .transpose()?
1617            .unwrap_or_default();
1618
1619        let this_room_id = self.inner.room_id();
1620
1621        if is_direct {
1622            let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1623            room_members.retain(|member| member.user_id() != self.own_user_id());
1624
1625            for member in room_members {
1626                let entry = content.entry(member.user_id().into()).or_default();
1627                if !entry.iter().any(|room_id| room_id == this_room_id) {
1628                    entry.push(this_room_id.to_owned());
1629                }
1630            }
1631        } else {
1632            for (_, list) in content.iter_mut() {
1633                list.retain(|room_id| *room_id != this_room_id);
1634            }
1635
1636            // Remove user ids that don't have any room marked as DM
1637            content.retain(|_, list| !list.is_empty());
1638        }
1639
1640        let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1641
1642        self.client.send(request).await?;
1643        Ok(())
1644    }
1645
1646    /// Tries to decrypt a room event.
1647    ///
1648    /// # Arguments
1649    /// * `event` - The room event to be decrypted.
1650    ///
1651    /// Returns the decrypted event. In the case of a decryption error, returns
1652    /// a `TimelineEvent` representing the decryption error.
1653    #[cfg(feature = "e2e-encryption")]
1654    #[cfg(not(feature = "experimental-encrypted-state-events"))]
1655    pub async fn decrypt_event(
1656        &self,
1657        event: &Raw<OriginalSyncRoomEncryptedEvent>,
1658        push_ctx: Option<&PushContext>,
1659    ) -> Result<TimelineEvent> {
1660        let machine = self.client.olm_machine().await;
1661        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1662
1663        match machine
1664            .try_decrypt_room_event(
1665                event.cast_ref(),
1666                self.inner.room_id(),
1667                self.client.decryption_settings(),
1668            )
1669            .await?
1670        {
1671            RoomEventDecryptionResult::Decrypted(decrypted) => {
1672                let push_actions = if let Some(push_ctx) = push_ctx {
1673                    Some(push_ctx.for_event(&decrypted.event).await)
1674                } else {
1675                    None
1676                };
1677                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1678            }
1679            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1680                self.client
1681                    .encryption()
1682                    .backups()
1683                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1684                Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1685            }
1686        }
1687    }
1688
1689    /// Tries to decrypt a room event.
1690    ///
1691    /// # Arguments
1692    /// * `event` - The room event to be decrypted.
1693    ///
1694    /// Returns the decrypted event. In the case of a decryption error, returns
1695    /// a `TimelineEvent` representing the decryption error.
1696    #[cfg(feature = "experimental-encrypted-state-events")]
1697    pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1698        &self,
1699        event: &Raw<T>,
1700        push_ctx: Option<&PushContext>,
1701    ) -> Result<TimelineEvent> {
1702        let machine = self.client.olm_machine().await;
1703        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1704
1705        match machine
1706            .try_decrypt_room_event(
1707                event.cast_ref(),
1708                self.inner.room_id(),
1709                self.client.decryption_settings(),
1710            )
1711            .await?
1712        {
1713            RoomEventDecryptionResult::Decrypted(decrypted) => {
1714                let push_actions = if let Some(push_ctx) = push_ctx {
1715                    Some(push_ctx.for_event(&decrypted.event).await)
1716                } else {
1717                    None
1718                };
1719                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1720            }
1721            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1722                self.client
1723                    .encryption()
1724                    .backups()
1725                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1726                // Cast safety: Anything that can be cast to EncryptedEvent must be a timeline
1727                // event.
1728                Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1729            }
1730        }
1731    }
1732
1733    /// Fetches the [`EncryptionInfo`] for an event decrypted with the supplied
1734    /// session_id.
1735    ///
1736    /// This may be used when we receive an update for a session, and we want to
1737    /// reflect the changes in messages we have received that were encrypted
1738    /// with that session, e.g. to remove a warning shield because a device is
1739    /// now verified.
1740    ///
1741    /// # Arguments
1742    /// * `session_id` - The ID of the Megolm session to get information for.
1743    /// * `sender` - The (claimed) sender of the event where the session was
1744    ///   used.
1745    #[cfg(feature = "e2e-encryption")]
1746    pub async fn get_encryption_info(
1747        &self,
1748        session_id: &str,
1749        sender: &UserId,
1750    ) -> Option<Arc<EncryptionInfo>> {
1751        let machine = self.client.olm_machine().await;
1752        let machine = machine.as_ref()?;
1753        machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1754    }
1755
1756    /// Forces the currently active room key, which is used to encrypt messages,
1757    /// to be rotated.
1758    ///
1759    /// A new room key will be crated and shared with all the room members the
1760    /// next time a message will be sent. You don't have to call this method,
1761    /// room keys will be rotated automatically when necessary. This method is
1762    /// still useful for debugging purposes.
1763    ///
1764    /// For more info please take a look a the [`encryption`] module
1765    /// documentation.
1766    ///
1767    /// [`encryption`]: crate::encryption
1768    #[cfg(feature = "e2e-encryption")]
1769    pub async fn discard_room_key(&self) -> Result<()> {
1770        let machine = self.client.olm_machine().await;
1771        if let Some(machine) = machine.as_ref() {
1772            machine.discard_room_key(self.inner.room_id()).await?;
1773            Ok(())
1774        } else {
1775            Err(Error::NoOlmMachine)
1776        }
1777    }
1778
1779    /// Ban the user with `UserId` from this room.
1780    ///
1781    /// # Arguments
1782    ///
1783    /// * `user_id` - The user to ban with `UserId`.
1784    ///
1785    /// * `reason` - The reason for banning this user.
1786    #[instrument(skip_all)]
1787    pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1788        let request = assign!(
1789            ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1790            { reason: reason.map(ToOwned::to_owned) }
1791        );
1792        self.client.send(request).await?;
1793        Ok(())
1794    }
1795
1796    /// Unban the user with `UserId` from this room.
1797    ///
1798    /// # Arguments
1799    ///
1800    /// * `user_id` - The user to unban with `UserId`.
1801    ///
1802    /// * `reason` - The reason for unbanning this user.
1803    #[instrument(skip_all)]
1804    pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1805        let request = assign!(
1806            unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1807            { reason: reason.map(ToOwned::to_owned) }
1808        );
1809        self.client.send(request).await?;
1810        Ok(())
1811    }
1812
1813    /// Kick a user out of this room.
1814    ///
1815    /// # Arguments
1816    ///
1817    /// * `user_id` - The `UserId` of the user that should be kicked out of the
1818    ///   room.
1819    ///
1820    /// * `reason` - Optional reason why the room member is being kicked out.
1821    #[instrument(skip_all)]
1822    pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1823        let request = assign!(
1824            kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1825            { reason: reason.map(ToOwned::to_owned) }
1826        );
1827        self.client.send(request).await?;
1828        Ok(())
1829    }
1830
1831    /// Invite the specified user by `UserId` to this room.
1832    ///
1833    /// # Arguments
1834    ///
1835    /// * `user_id` - The `UserId` of the user to invite to the room.
1836    #[instrument(skip_all)]
1837    pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1838        #[cfg(feature = "e2e-encryption")]
1839        if self.client.inner.enable_share_history_on_invite {
1840            shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1841        }
1842
1843        let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1844        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1845        self.client.send(request).await?;
1846
1847        // Force a future room members reload before sending any event to prevent UTDs
1848        // that can happen when some event is sent after a room member has been invited
1849        // but before the /sync request could fetch the membership change event.
1850        self.mark_members_missing();
1851
1852        Ok(())
1853    }
1854
1855    /// Invite the specified user by third party id to this room.
1856    ///
1857    /// # Arguments
1858    ///
1859    /// * `invite_id` - A third party id of a user to invite to the room.
1860    #[instrument(skip_all)]
1861    pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1862        let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1863        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1864        self.client.send(request).await?;
1865
1866        // Force a future room members reload before sending any event to prevent UTDs
1867        // that can happen when some event is sent after a room member has been invited
1868        // but before the /sync request could fetch the membership change event.
1869        self.mark_members_missing();
1870
1871        Ok(())
1872    }
1873
1874    /// Activate typing notice for this room.
1875    ///
1876    /// The typing notice remains active for 4s. It can be deactivate at any
1877    /// point by setting typing to `false`. If this method is called while
1878    /// the typing notice is active nothing will happen. This method can be
1879    /// called on every key stroke, since it will do nothing while typing is
1880    /// active.
1881    ///
1882    /// # Arguments
1883    ///
1884    /// * `typing` - Whether the user is typing or has stopped typing.
1885    ///
1886    /// # Examples
1887    ///
1888    /// ```no_run
1889    /// use std::time::Duration;
1890    ///
1891    /// use matrix_sdk::ruma::api::client::typing::create_typing_event::v3::Typing;
1892    /// # use matrix_sdk::{
1893    /// #     Client, config::SyncSettings,
1894    /// #     ruma::room_id,
1895    /// # };
1896    /// # use url::Url;
1897    ///
1898    /// # async {
1899    /// # let homeserver = Url::parse("http://localhost:8080")?;
1900    /// # let client = Client::new(homeserver).await?;
1901    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
1902    ///
1903    /// if let Some(room) = client.get_room(&room_id) {
1904    ///     room.typing_notice(true).await?
1905    /// }
1906    /// # anyhow::Ok(()) };
1907    /// ```
1908    pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1909        self.ensure_room_joined()?;
1910
1911        // Only send a request to the homeserver if the old timeout has elapsed
1912        // or the typing notice changed state within the `TYPING_NOTICE_TIMEOUT`
1913        let send = if let Some(typing_time) =
1914            self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1915        {
1916            if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1917                // We always reactivate the typing notice if typing is true or
1918                // we may need to deactivate it if it's
1919                // currently active if typing is false
1920                typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1921            } else {
1922                // Only send a request when we need to deactivate typing
1923                !typing
1924            }
1925        } else {
1926            // Typing notice is currently deactivated, therefore, send a request
1927            // only when it's about to be activated
1928            typing
1929        };
1930
1931        if send {
1932            self.send_typing_notice(typing).await?;
1933        }
1934
1935        Ok(())
1936    }
1937
1938    #[instrument(name = "typing_notice", skip(self))]
1939    async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1940        let typing = if typing {
1941            self.client
1942                .inner
1943                .typing_notice_times
1944                .write()
1945                .unwrap()
1946                .insert(self.room_id().to_owned(), Instant::now());
1947            Typing::Yes(TYPING_NOTICE_TIMEOUT)
1948        } else {
1949            self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1950            Typing::No
1951        };
1952
1953        let request = create_typing_event::v3::Request::new(
1954            self.own_user_id().to_owned(),
1955            self.room_id().to_owned(),
1956            typing,
1957        );
1958
1959        self.client.send(request).await?;
1960
1961        Ok(())
1962    }
1963
1964    /// Send a request to set a single receipt.
1965    ///
1966    /// If an unthreaded receipt is sent, this will also unset the unread flag
1967    /// of the room if necessary.
1968    ///
1969    /// # Arguments
1970    ///
1971    /// * `receipt_type` - The type of the receipt to set. Note that it is
1972    ///   possible to set the fully-read marker although it is technically not a
1973    ///   receipt.
1974    ///
1975    /// * `thread` - The thread where this receipt should apply, if any. Note
1976    ///   that this must be [`ReceiptThread::Unthreaded`] when sending a
1977    ///   [`ReceiptType::FullyRead`][create_receipt::v3::ReceiptType::FullyRead].
1978    ///
1979    /// * `event_id` - The `EventId` of the event to set the receipt on.
1980    #[instrument(skip_all)]
1981    pub async fn send_single_receipt(
1982        &self,
1983        receipt_type: create_receipt::v3::ReceiptType,
1984        thread: ReceiptThread,
1985        event_id: OwnedEventId,
1986    ) -> Result<()> {
1987        // Since the receipt type and the thread aren't Hash/Ord, flatten then as a
1988        // string key.
1989        let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1990
1991        self.client
1992            .inner
1993            .locks
1994            .read_receipt_deduplicated_handler
1995            .run((request_key, event_id.clone()), async {
1996                // We will unset the unread flag if we send an unthreaded receipt.
1997                let is_unthreaded = thread == ReceiptThread::Unthreaded;
1998
1999                let mut request = create_receipt::v3::Request::new(
2000                    self.room_id().to_owned(),
2001                    receipt_type,
2002                    event_id,
2003                );
2004                request.thread = thread;
2005
2006                self.client.send(request).await?;
2007
2008                if is_unthreaded {
2009                    self.set_unread_flag(false).await?;
2010                }
2011
2012                Ok(())
2013            })
2014            .await
2015    }
2016
2017    /// Send a request to set multiple receipts at once.
2018    ///
2019    /// This will also unset the unread flag of the room if necessary.
2020    ///
2021    /// # Arguments
2022    ///
2023    /// * `receipts` - The `Receipts` to send.
2024    ///
2025    /// If `receipts` is empty, this is a no-op.
2026    #[instrument(skip_all)]
2027    pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2028        if receipts.is_empty() {
2029            return Ok(());
2030        }
2031
2032        let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2033        let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2034            fully_read,
2035            read_receipt: public_read_receipt,
2036            private_read_receipt,
2037        });
2038
2039        self.client.send(request).await?;
2040
2041        self.set_unread_flag(false).await?;
2042
2043        Ok(())
2044    }
2045
2046    /// Helper function to enable End-to-end encryption in this room.
2047    /// `encrypted_state_events` is not used unless the
2048    /// `experimental-encrypted-state-events` feature is enabled.
2049    #[allow(unused_variables, unused_mut)]
2050    async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2051        use ruma::{
2052            EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2053        };
2054        const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2055
2056        if !self.latest_encryption_state().await?.is_encrypted() {
2057            let mut content =
2058                RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2059            #[cfg(feature = "experimental-encrypted-state-events")]
2060            if encrypted_state_events {
2061                content = content.with_encrypted_state();
2062            }
2063            self.send_state_event(content).await?;
2064
2065            // Spin on the sync beat event, since the first sync we receive might not
2066            // include the encryption event.
2067            //
2068            // TODO do we want to return an error here if we time out? This
2069            // could be quite useful if someone wants to enable encryption and
2070            // send a message right after it's enabled.
2071            let res = timeout(
2072                async {
2073                    loop {
2074                        // Listen for sync events, then check if the encryption state is known.
2075                        self.client.inner.sync_beat.listen().await;
2076                        let _sync_lock = self.client.base_client().sync_lock().lock().await;
2077                        if !self.inner.encryption_state().is_unknown() {
2078                            break;
2079                        }
2080                    }
2081                },
2082                SYNC_WAIT_TIME,
2083            )
2084            .await;
2085
2086            let _sync_lock = self.client.base_client().sync_lock().lock().await;
2087
2088            // If encryption was enabled, return.
2089            #[cfg(not(feature = "experimental-encrypted-state-events"))]
2090            if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2091                debug!("room successfully marked as encrypted");
2092                return Ok(());
2093            }
2094
2095            // If encryption with state event encryption was enabled, return.
2096            #[cfg(feature = "experimental-encrypted-state-events")]
2097            if res.is_ok() && {
2098                if encrypted_state_events {
2099                    self.inner.encryption_state().is_state_encrypted()
2100                } else {
2101                    self.inner.encryption_state().is_encrypted()
2102                }
2103            } {
2104                debug!("room successfully marked as encrypted");
2105                return Ok(());
2106            }
2107
2108            // If after waiting for multiple syncs, we don't have the encryption state we
2109            // expect, assume the local encryption state is incorrect; this will
2110            // cause the SDK to re-request it later for confirmation, instead of
2111            // assuming it's sync'd and correct (and not encrypted).
2112            debug!("still not marked as encrypted, marking encryption state as missing");
2113
2114            let mut room_info = self.clone_info();
2115            room_info.mark_encryption_state_missing();
2116            let mut changes = StateChanges::default();
2117            changes.add_room(room_info.clone());
2118
2119            self.client.state_store().save_changes(&changes).await?;
2120            self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2121        }
2122
2123        Ok(())
2124    }
2125
2126    /// Enable End-to-end encryption in this room.
2127    ///
2128    /// This method will be a noop if encryption is already enabled, otherwise
2129    /// sends a `m.room.encryption` state event to the room. This might fail if
2130    /// you don't have the appropriate power level to enable end-to-end
2131    /// encryption.
2132    ///
2133    /// A sync needs to be received to update the local room state. This method
2134    /// will wait for a sync to be received, this might time out if no
2135    /// sync loop is running or if the server is slow.
2136    ///
2137    /// # Examples
2138    ///
2139    /// ```no_run
2140    /// # use matrix_sdk::{
2141    /// #     Client, config::SyncSettings,
2142    /// #     ruma::room_id,
2143    /// # };
2144    /// # use url::Url;
2145    /// #
2146    /// # async {
2147    /// # let homeserver = Url::parse("http://localhost:8080")?;
2148    /// # let client = Client::new(homeserver).await?;
2149    /// # let room_id = room_id!("!test:localhost");
2150    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2151    ///
2152    /// if let Some(room) = client.get_room(&room_id) {
2153    ///     room.enable_encryption().await?
2154    /// }
2155    /// # anyhow::Ok(()) };
2156    /// ```
2157    #[instrument(skip_all)]
2158    pub async fn enable_encryption(&self) -> Result<()> {
2159        self.enable_encryption_inner(false).await
2160    }
2161
2162    /// Enable End-to-end encryption in this room, opting into experimental
2163    /// state event encryption.
2164    ///
2165    /// This method will be a noop if encryption is already enabled, otherwise
2166    /// sends a `m.room.encryption` state event to the room. This might fail if
2167    /// you don't have the appropriate power level to enable end-to-end
2168    /// encryption.
2169    ///
2170    /// A sync needs to be received to update the local room state. This method
2171    /// will wait for a sync to be received, this might time out if no
2172    /// sync loop is running or if the server is slow.
2173    ///
2174    /// # Examples
2175    ///
2176    /// ```no_run
2177    /// # use matrix_sdk::{
2178    /// #     Client, config::SyncSettings,
2179    /// #     ruma::room_id,
2180    /// # };
2181    /// # use url::Url;
2182    /// #
2183    /// # async {
2184    /// # let homeserver = Url::parse("http://localhost:8080")?;
2185    /// # let client = Client::new(homeserver).await?;
2186    /// # let room_id = room_id!("!test:localhost");
2187    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2188    ///
2189    /// if let Some(room) = client.get_room(&room_id) {
2190    ///     room.enable_encryption_with_state_event_encryption().await?
2191    /// }
2192    /// # anyhow::Ok(()) };
2193    /// ```
2194    #[instrument(skip_all)]
2195    #[cfg(feature = "experimental-encrypted-state-events")]
2196    pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2197        self.enable_encryption_inner(true).await
2198    }
2199
2200    /// Share a room key with users in the given room.
2201    ///
2202    /// This will create Olm sessions with all the users/device pairs in the
2203    /// room if necessary and share a room key that can be shared with them.
2204    ///
2205    /// Does nothing if no room key needs to be shared.
2206    // TODO: expose this publicly so people can pre-share a group session if
2207    // e.g. a user starts to type a message for a room.
2208    #[cfg(feature = "e2e-encryption")]
2209    #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2210    async fn preshare_room_key(&self) -> Result<()> {
2211        self.ensure_room_joined()?;
2212
2213        // Take and release the lock on the store, if needs be.
2214        let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2215        tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2216
2217        self.client
2218            .locks()
2219            .group_session_deduplicated_handler
2220            .run(self.room_id().to_owned(), async move {
2221                {
2222                    let members = self
2223                        .client
2224                        .state_store()
2225                        .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2226                        .await?;
2227                    self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2228                };
2229
2230                let response = self.share_room_key().await;
2231
2232                // If one of the responses failed invalidate the group
2233                // session as using it would end up in undecryptable
2234                // messages.
2235                if let Err(r) = response {
2236                    let machine = self.client.olm_machine().await;
2237                    if let Some(machine) = machine.as_ref() {
2238                        machine.discard_room_key(self.room_id()).await?;
2239                    }
2240                    return Err(r);
2241                }
2242
2243                Ok(())
2244            })
2245            .await
2246    }
2247
2248    /// Share a group session for a room.
2249    ///
2250    /// # Panics
2251    ///
2252    /// Panics if the client isn't logged in.
2253    #[cfg(feature = "e2e-encryption")]
2254    #[instrument(skip_all)]
2255    async fn share_room_key(&self) -> Result<()> {
2256        self.ensure_room_joined()?;
2257
2258        let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2259
2260        for request in requests {
2261            let response = self.client.send_to_device(&request).await?;
2262            self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2263        }
2264
2265        Ok(())
2266    }
2267
2268    /// Wait for the room to be fully synced.
2269    ///
2270    /// This method makes sure the room that was returned when joining a room
2271    /// has been echoed back in the sync.
2272    ///
2273    /// Warning: This waits until a sync happens and does not return if no sync
2274    /// is happening. It can also return early when the room is not a joined
2275    /// room anymore.
2276    #[instrument(skip_all)]
2277    pub async fn sync_up(&self) {
2278        while !self.is_synced() && self.state() == RoomState::Joined {
2279            let wait_for_beat = self.client.inner.sync_beat.listen();
2280            // We don't care whether it's a timeout or a sync beat.
2281            let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2282        }
2283    }
2284
2285    /// Send a message-like event to this room.
2286    ///
2287    /// Returns the parsed response from the server.
2288    ///
2289    /// If the encryption feature is enabled this method will transparently
2290    /// encrypt the event if this room is encrypted (except for `m.reaction`
2291    /// events, which are never encrypted).
2292    ///
2293    /// **Note**: If you just want to send an event with custom JSON content to
2294    /// a room, you can use the [`send_raw()`][Self::send_raw] method for that.
2295    ///
2296    /// If you want to set a transaction ID for the event, use
2297    /// [`.with_transaction_id()`][SendMessageLikeEvent::with_transaction_id]
2298    /// on the returned value before `.await`ing it.
2299    ///
2300    /// # Arguments
2301    ///
2302    /// * `content` - The content of the message event.
2303    ///
2304    /// # Examples
2305    ///
2306    /// ```no_run
2307    /// # use std::sync::{Arc, RwLock};
2308    /// # use matrix_sdk::{Client, config::SyncSettings};
2309    /// # use url::Url;
2310    /// # use matrix_sdk::ruma::room_id;
2311    /// # use serde::{Deserialize, Serialize};
2312    /// use matrix_sdk::ruma::{
2313    ///     MilliSecondsSinceUnixEpoch, TransactionId,
2314    ///     events::{
2315    ///         macros::EventContent,
2316    ///         room::message::{RoomMessageEventContent, TextMessageEventContent},
2317    ///     },
2318    ///     uint,
2319    /// };
2320    ///
2321    /// # async {
2322    /// # let homeserver = Url::parse("http://localhost:8080")?;
2323    /// # let mut client = Client::new(homeserver).await?;
2324    /// # let room_id = room_id!("!test:localhost");
2325    /// let content = RoomMessageEventContent::text_plain("Hello world");
2326    /// let txn_id = TransactionId::new();
2327    ///
2328    /// if let Some(room) = client.get_room(&room_id) {
2329    ///     room.send(content).with_transaction_id(txn_id).await?;
2330    /// }
2331    ///
2332    /// // Custom events work too:
2333    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2334    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
2335    /// struct TokenEventContent {
2336    ///     token: String,
2337    ///     #[serde(rename = "exp")]
2338    ///     expires_at: MilliSecondsSinceUnixEpoch,
2339    /// }
2340    ///
2341    /// # fn generate_token() -> String { todo!() }
2342    /// let content = TokenEventContent {
2343    ///     token: generate_token(),
2344    ///     expires_at: {
2345    ///         let now = MilliSecondsSinceUnixEpoch::now();
2346    ///         MilliSecondsSinceUnixEpoch(now.0 + uint!(30_000))
2347    ///     },
2348    /// };
2349    ///
2350    /// if let Some(room) = client.get_room(&room_id) {
2351    ///     room.send(content).await?;
2352    /// }
2353    /// # anyhow::Ok(()) };
2354    /// ```
2355    pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2356        SendMessageLikeEvent::new(self, content)
2357    }
2358
2359    /// Run /keys/query requests for all the non-tracked users, and for users
2360    /// with an out-of-date device list.
2361    #[cfg(feature = "e2e-encryption")]
2362    async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2363        let olm = self.client.olm_machine().await;
2364        let olm = olm.as_ref().expect("Olm machine wasn't started");
2365
2366        let members =
2367            self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2368
2369        let tracked: HashMap<_, _> = olm
2370            .store()
2371            .load_tracked_users()
2372            .await?
2373            .into_iter()
2374            .map(|tracked| (tracked.user_id, tracked.dirty))
2375            .collect();
2376
2377        // A member has no unknown devices iff it was tracked *and* the tracking is
2378        // not considered dirty.
2379        let members_with_unknown_devices =
2380            members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2381
2382        let (req_id, request) =
2383            olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2384
2385        if !request.device_keys.is_empty() {
2386            self.client.keys_query(&req_id, request.device_keys).await?;
2387        }
2388
2389        Ok(())
2390    }
2391
2392    /// Send a message-like event with custom JSON content to this room.
2393    ///
2394    /// Returns the parsed response from the server.
2395    ///
2396    /// If the encryption feature is enabled this method will transparently
2397    /// encrypt the event if this room is encrypted (except for `m.reaction`
2398    /// events, which are never encrypted).
2399    ///
2400    /// This method is equivalent to the [`send()`][Self::send] method but
2401    /// allows sending custom JSON payloads, e.g. constructed using the
2402    /// [`serde_json::json!()`] macro.
2403    ///
2404    /// If you want to set a transaction ID for the event, use
2405    /// [`.with_transaction_id()`][SendRawMessageLikeEvent::with_transaction_id]
2406    /// on the returned value before `.await`ing it.
2407    ///
2408    /// # Arguments
2409    ///
2410    /// * `event_type` - The type of the event.
2411    ///
2412    /// * `content` - The content of the event as a raw JSON value. The argument
2413    ///   type can be `serde_json::Value`, but also other raw JSON types; for
2414    ///   the full list check the documentation of
2415    ///   [`IntoRawMessageLikeEventContent`].
2416    ///
2417    /// # Examples
2418    ///
2419    /// ```no_run
2420    /// # use std::sync::{Arc, RwLock};
2421    /// # use matrix_sdk::{Client, config::SyncSettings};
2422    /// # use url::Url;
2423    /// # use matrix_sdk::ruma::room_id;
2424    /// # async {
2425    /// # let homeserver = Url::parse("http://localhost:8080")?;
2426    /// # let mut client = Client::new(homeserver).await?;
2427    /// # let room_id = room_id!("!test:localhost");
2428    /// use serde_json::json;
2429    ///
2430    /// if let Some(room) = client.get_room(&room_id) {
2431    ///     room.send_raw("m.room.message", json!({ "body": "Hello world" })).await?;
2432    /// }
2433    /// # anyhow::Ok(()) };
2434    /// ```
2435    #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2436    pub fn send_raw<'a>(
2437        &'a self,
2438        event_type: &'a str,
2439        content: impl IntoRawMessageLikeEventContent,
2440    ) -> SendRawMessageLikeEvent<'a> {
2441        // Note: the recorded instrument fields are saved in
2442        // `SendRawMessageLikeEvent::into_future`.
2443        SendRawMessageLikeEvent::new(self, event_type, content)
2444    }
2445
2446    /// Send an attachment to this room.
2447    ///
2448    /// This will upload the given data that the reader produces using the
2449    /// [`upload()`] method and post an event to the given room.
2450    /// If the room is encrypted and the encryption feature is enabled the
2451    /// upload will be encrypted.
2452    ///
2453    /// This is a convenience method that calls the
2454    /// [`upload()`] and afterwards the [`send()`].
2455    ///
2456    /// # Arguments
2457    /// * `filename` - The file name.
2458    ///
2459    /// * `content_type` - The type of the media, this will be used as the
2460    /// content-type header.
2461    ///
2462    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2463    /// media.
2464    ///
2465    /// * `config` - Metadata and configuration for the attachment.
2466    ///
2467    /// # Examples
2468    ///
2469    /// ```no_run
2470    /// # use std::fs;
2471    /// # use matrix_sdk::{Client, ruma::room_id, attachment::AttachmentConfig};
2472    /// # use url::Url;
2473    /// # use mime;
2474    /// # async {
2475    /// # let homeserver = Url::parse("http://localhost:8080")?;
2476    /// # let mut client = Client::new(homeserver).await?;
2477    /// # let room_id = room_id!("!test:localhost");
2478    /// let mut image = fs::read("/home/example/my-cat.jpg")?;
2479    ///
2480    /// if let Some(room) = client.get_room(&room_id) {
2481    ///     room.send_attachment(
2482    ///         "my_favorite_cat.jpg",
2483    ///         &mime::IMAGE_JPEG,
2484    ///         image,
2485    ///         AttachmentConfig::new(),
2486    ///     ).await?;
2487    /// }
2488    /// # anyhow::Ok(()) };
2489    /// ```
2490    ///
2491    /// [`upload()`]: crate::Media::upload
2492    /// [`send()`]: Self::send
2493    #[instrument(skip_all)]
2494    pub fn send_attachment<'a>(
2495        &'a self,
2496        filename: impl Into<String>,
2497        content_type: &'a Mime,
2498        data: Vec<u8>,
2499        config: AttachmentConfig,
2500    ) -> SendAttachment<'a> {
2501        SendAttachment::new(self, filename.into(), content_type, data, config)
2502    }
2503
2504    /// Prepare and send an attachment to this room.
2505    ///
2506    /// This will upload the given data that the reader produces using the
2507    /// [`upload()`](#method.upload) method and post an event to the given room.
2508    /// If the room is encrypted and the encryption feature is enabled the
2509    /// upload will be encrypted.
2510    ///
2511    /// This is a convenience method that calls the
2512    /// [`Client::upload()`](#Client::method.upload) and afterwards the
2513    /// [`send()`](#method.send).
2514    ///
2515    /// # Arguments
2516    /// * `filename` - The file name.
2517    ///
2518    /// * `content_type` - The type of the media, this will be used as the
2519    ///   content-type header.
2520    ///
2521    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2522    ///   media.
2523    ///
2524    /// * `config` - Metadata and configuration for the attachment.
2525    ///
2526    /// * `send_progress` - An observable to transmit forward progress about the
2527    ///   upload.
2528    ///
2529    /// * `store_in_cache` - A boolean defining whether the uploaded media will
2530    ///   be stored in the cache immediately after a successful upload.
2531    #[instrument(skip_all)]
2532    pub(super) async fn prepare_and_send_attachment<'a>(
2533        &'a self,
2534        filename: String,
2535        content_type: &'a Mime,
2536        data: Vec<u8>,
2537        mut config: AttachmentConfig,
2538        send_progress: SharedObservable<TransmissionProgress>,
2539        store_in_cache: bool,
2540    ) -> Result<send_message_event::v3::Response> {
2541        self.ensure_room_joined()?;
2542
2543        let txn_id = config.txn_id.take();
2544        let mentions = config.mentions.take();
2545
2546        let thumbnail = config.thumbnail.take();
2547
2548        // If necessary, store caching data for the thumbnail ahead of time.
2549        let thumbnail_cache_info = if store_in_cache {
2550            thumbnail
2551                .as_ref()
2552                .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2553        } else {
2554            None
2555        };
2556
2557        #[cfg(feature = "e2e-encryption")]
2558        let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2559            self.client
2560                .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2561                .await?
2562        } else {
2563            self.client
2564                .media()
2565                .upload_plain_media_and_thumbnail(
2566                    content_type,
2567                    // TODO: get rid of this clone; wait for Ruma to use `Bytes` or something
2568                    // similar.
2569                    data.clone(),
2570                    thumbnail,
2571                    send_progress,
2572                )
2573                .await?
2574        };
2575
2576        #[cfg(not(feature = "e2e-encryption"))]
2577        let (media_source, thumbnail) = self
2578            .client
2579            .media()
2580            .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2581            .await?;
2582
2583        if store_in_cache {
2584            let media_store_lock_guard = self.client.media_store().lock().await?;
2585
2586            // A failure to cache shouldn't prevent the whole upload from finishing
2587            // properly, so only log errors during caching.
2588
2589            debug!("caching the media");
2590            let request =
2591                MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2592
2593            if let Err(err) = media_store_lock_guard
2594                .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2595                .await
2596            {
2597                warn!("unable to cache the media after uploading it: {err}");
2598            }
2599
2600            if let Some(((data, height, width), source)) =
2601                thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2602            {
2603                debug!("caching the thumbnail");
2604
2605                let request = MediaRequestParameters {
2606                    source: source.clone(),
2607                    format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2608                };
2609
2610                if let Err(err) = media_store_lock_guard
2611                    .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2612                    .await
2613                {
2614                    warn!("unable to cache the media after uploading it: {err}");
2615                }
2616            }
2617        }
2618
2619        let content = self
2620            .make_media_event(
2621                Room::make_attachment_type(
2622                    content_type,
2623                    filename,
2624                    media_source,
2625                    config.caption,
2626                    config.formatted_caption,
2627                    config.info,
2628                    thumbnail,
2629                ),
2630                mentions,
2631                config.reply,
2632            )
2633            .await?;
2634
2635        let mut fut = self.send(content);
2636        if let Some(txn_id) = txn_id {
2637            fut = fut.with_transaction_id(txn_id);
2638        }
2639        fut.await
2640    }
2641
2642    /// Creates the inner [`MessageType`] for an already-uploaded media file
2643    /// provided by its source.
2644    #[allow(clippy::too_many_arguments)]
2645    pub(crate) fn make_attachment_type(
2646        content_type: &Mime,
2647        filename: String,
2648        source: MediaSource,
2649        caption: Option<String>,
2650        formatted_caption: Option<FormattedBody>,
2651        info: Option<AttachmentInfo>,
2652        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2653    ) -> MessageType {
2654        make_media_type!(
2655            MessageType,
2656            content_type,
2657            filename,
2658            source,
2659            caption,
2660            formatted_caption,
2661            info,
2662            thumbnail
2663        )
2664    }
2665
2666    /// Creates the [`RoomMessageEventContent`] based on the message type,
2667    /// mentions and reply information.
2668    pub(crate) async fn make_media_event(
2669        &self,
2670        msg_type: MessageType,
2671        mentions: Option<Mentions>,
2672        reply: Option<Reply>,
2673    ) -> Result<RoomMessageEventContent> {
2674        let mut content = RoomMessageEventContent::new(msg_type);
2675        if let Some(mentions) = mentions {
2676            content = content.add_mentions(mentions);
2677        }
2678        if let Some(reply) = reply {
2679            // Since we just created the event, there is no relation attached to it. Thus,
2680            // it is safe to add the reply relation without overriding anything.
2681            content = self.make_reply_event(content.into(), reply).await?;
2682        }
2683        Ok(content)
2684    }
2685
2686    /// Creates the inner [`GalleryItemType`] for an already-uploaded media file
2687    /// provided by its source.
2688    #[cfg(feature = "unstable-msc4274")]
2689    #[allow(clippy::too_many_arguments)]
2690    pub(crate) fn make_gallery_item_type(
2691        content_type: &Mime,
2692        filename: String,
2693        source: MediaSource,
2694        caption: Option<String>,
2695        formatted_caption: Option<FormattedBody>,
2696        info: Option<AttachmentInfo>,
2697        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2698    ) -> GalleryItemType {
2699        make_media_type!(
2700            GalleryItemType,
2701            content_type,
2702            filename,
2703            source,
2704            caption,
2705            formatted_caption,
2706            info,
2707            thumbnail
2708        )
2709    }
2710
2711    /// Update the power levels of a select set of users of this room.
2712    ///
2713    /// Issue a `power_levels` state event request to the server, changing the
2714    /// given UserId -> Int levels. May fail if the `power_levels` aren't
2715    /// locally known yet or the server rejects the state event update, e.g.
2716    /// because of insufficient permissions. Neither permissions to update
2717    /// nor whether the data might be stale is checked prior to issuing the
2718    /// request.
2719    pub async fn update_power_levels(
2720        &self,
2721        updates: Vec<(&UserId, Int)>,
2722    ) -> Result<send_state_event::v3::Response> {
2723        let mut power_levels = self.power_levels().await?;
2724
2725        for (user_id, new_level) in updates {
2726            if new_level == power_levels.users_default {
2727                power_levels.users.remove(user_id);
2728            } else {
2729                power_levels.users.insert(user_id.to_owned(), new_level);
2730            }
2731        }
2732
2733        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2734    }
2735
2736    /// Applies a set of power level changes to this room.
2737    ///
2738    /// Any values that are `None` in the given `RoomPowerLevelChanges` will
2739    /// remain unchanged.
2740    pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2741        let mut power_levels = self.power_levels().await?;
2742        power_levels.apply(changes)?;
2743        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2744        Ok(())
2745    }
2746
2747    /// Resets the room's power levels to the default values
2748    ///
2749    /// [spec]: https://spec.matrix.org/v1.9/client-server-api/#mroompower_levels
2750    pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2751        let creators = self.creators().unwrap_or_default();
2752        let rules = self.clone_info().room_version_rules_or_default();
2753
2754        let default_power_levels =
2755            RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2756        let changes = RoomPowerLevelChanges::from(default_power_levels);
2757        self.apply_power_level_changes(changes).await?;
2758        Ok(self.power_levels().await?)
2759    }
2760
2761    /// Gets the suggested role for the user with the provided `user_id`.
2762    ///
2763    /// This method checks the `RoomPowerLevels` events instead of loading the
2764    /// member list and looking for the member.
2765    pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2766        let power_level = self.get_user_power_level(user_id).await?;
2767        Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2768    }
2769
2770    /// Gets the power level the user with the provided `user_id`.
2771    ///
2772    /// This method checks the `RoomPowerLevels` events instead of loading the
2773    /// member list and looking for the member.
2774    pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2775        let event = self.power_levels().await?;
2776        Ok(event.for_user(user_id))
2777    }
2778
2779    /// Gets a map with the `UserId` of users with power levels other than `0`
2780    /// and this power level.
2781    pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2782        let power_levels = self.power_levels().await.ok();
2783        let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2784        if let Some(power_levels) = power_levels {
2785            for (id, level) in power_levels.users.into_iter() {
2786                user_power_levels.insert(id, level.into());
2787            }
2788        }
2789        user_power_levels
2790    }
2791
2792    /// Sets the name of this room.
2793    pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2794        self.send_state_event(RoomNameEventContent::new(name)).await
2795    }
2796
2797    /// Sets a new topic for this room.
2798    pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2799        self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2800    }
2801
2802    /// Sets the new avatar url for this room.
2803    ///
2804    /// # Arguments
2805    /// * `avatar_url` - The owned Matrix uri that represents the avatar
2806    /// * `info` - The optional image info that can be provided for the avatar
2807    pub async fn set_avatar_url(
2808        &self,
2809        url: &MxcUri,
2810        info: Option<avatar::ImageInfo>,
2811    ) -> Result<send_state_event::v3::Response> {
2812        self.ensure_room_joined()?;
2813
2814        let mut room_avatar_event = RoomAvatarEventContent::new();
2815        room_avatar_event.url = Some(url.to_owned());
2816        room_avatar_event.info = info.map(Box::new);
2817
2818        self.send_state_event(room_avatar_event).await
2819    }
2820
2821    /// Removes the avatar from the room
2822    pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2823        self.send_state_event(RoomAvatarEventContent::new()).await
2824    }
2825
2826    /// Uploads a new avatar for this room.
2827    ///
2828    /// # Arguments
2829    /// * `mime` - The mime type describing the data
2830    /// * `data` - The data representation of the avatar
2831    /// * `info` - The optional image info provided for the avatar, the blurhash
2832    ///   and the mimetype will always be updated
2833    pub async fn upload_avatar(
2834        &self,
2835        mime: &Mime,
2836        data: Vec<u8>,
2837        info: Option<avatar::ImageInfo>,
2838    ) -> Result<send_state_event::v3::Response> {
2839        self.ensure_room_joined()?;
2840
2841        let upload_response = self.client.media().upload(mime, data, None).await?;
2842        let mut info = info.unwrap_or_default();
2843        info.blurhash = upload_response.blurhash;
2844        info.mimetype = Some(mime.to_string());
2845
2846        self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2847    }
2848
2849    /// Send a state event with an empty state key to the homeserver.
2850    ///
2851    /// For state events with a non-empty state key, see
2852    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
2853    ///
2854    /// Returns the parsed response from the server.
2855    ///
2856    /// # Arguments
2857    ///
2858    /// * `content` - The content of the state event.
2859    ///
2860    /// # Examples
2861    ///
2862    /// ```no_run
2863    /// # use serde::{Deserialize, Serialize};
2864    /// # async {
2865    /// # let joined_room: matrix_sdk::Room = todo!();
2866    /// use matrix_sdk::ruma::{
2867    ///     EventEncryptionAlgorithm,
2868    ///     events::{
2869    ///         EmptyStateKey, macros::EventContent,
2870    ///         room::encryption::RoomEncryptionEventContent,
2871    ///     },
2872    /// };
2873    ///
2874    /// let encryption_event_content = RoomEncryptionEventContent::new(
2875    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
2876    /// );
2877    /// joined_room.send_state_event(encryption_event_content).await?;
2878    ///
2879    /// // Custom event:
2880    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2881    /// #[ruma_event(
2882    ///     type = "org.matrix.msc_9000.xxx",
2883    ///     kind = State,
2884    ///     state_key_type = EmptyStateKey,
2885    /// )]
2886    /// struct XxxStateEventContent {/* fields... */}
2887    ///
2888    /// let content: XxxStateEventContent = todo!();
2889    /// joined_room.send_state_event(content).await?;
2890    /// # anyhow::Ok(()) };
2891    /// ```
2892    #[cfg(not(feature = "experimental-encrypted-state-events"))]
2893    #[instrument(skip_all)]
2894    pub async fn send_state_event(
2895        &self,
2896        content: impl StateEventContent<StateKey = EmptyStateKey>,
2897    ) -> Result<send_state_event::v3::Response> {
2898        self.send_state_event_for_key(&EmptyStateKey, content).await
2899    }
2900
2901    /// Send a state event with an empty state key to the homeserver.
2902    ///
2903    /// For state events with a non-empty state key, see
2904    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
2905    ///
2906    /// If the experimental state event encryption feature is enabled, this
2907    /// method will transparently encrypt the event if this room is
2908    /// encrypted (except if the event type is considered critical for the room
2909    /// to function, as outlined in [MSC3414][msc3414]).
2910    ///
2911    /// Returns the parsed response from the server.
2912    ///
2913    /// # Arguments
2914    ///
2915    /// * `content` - The content of the state event.
2916    ///
2917    /// # Examples
2918    ///
2919    /// ```no_run
2920    /// # use serde::{Deserialize, Serialize};
2921    /// # async {
2922    /// # let joined_room: matrix_sdk::Room = todo!();
2923    /// use matrix_sdk::ruma::{
2924    ///     EventEncryptionAlgorithm,
2925    ///     events::{
2926    ///         EmptyStateKey, macros::EventContent,
2927    ///         room::encryption::RoomEncryptionEventContent,
2928    ///     },
2929    /// };
2930    ///
2931    /// let encryption_event_content = RoomEncryptionEventContent::new(
2932    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
2933    /// );
2934    /// joined_room.send_state_event(encryption_event_content).await?;
2935    ///
2936    /// // Custom event:
2937    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2938    /// #[ruma_event(
2939    ///     type = "org.matrix.msc_9000.xxx",
2940    ///     kind = State,
2941    ///     state_key_type = EmptyStateKey,
2942    /// )]
2943    /// struct XxxStateEventContent {/* fields... */}
2944    ///
2945    /// let content: XxxStateEventContent = todo!();
2946    /// joined_room.send_state_event(content).await?;
2947    /// # anyhow::Ok(()) };
2948    /// ```
2949    ///
2950    /// [msc3414]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/3414-encrypted-state.md
2951    #[cfg(feature = "experimental-encrypted-state-events")]
2952    #[instrument(skip_all)]
2953    pub fn send_state_event<'a>(
2954        &'a self,
2955        content: impl StateEventContent<StateKey = EmptyStateKey>,
2956    ) -> SendStateEvent<'a> {
2957        self.send_state_event_for_key(&EmptyStateKey, content)
2958    }
2959
2960    /// Send a state event to the homeserver.
2961    ///
2962    /// Returns the parsed response from the server.
2963    ///
2964    /// # Arguments
2965    ///
2966    /// * `content` - The content of the state event.
2967    ///
2968    /// * `state_key` - A unique key which defines the overwriting semantics for
2969    ///   this piece of room state.
2970    ///
2971    /// # Examples
2972    ///
2973    /// ```no_run
2974    /// # use serde::{Deserialize, Serialize};
2975    /// # async {
2976    /// # let joined_room: matrix_sdk::Room = todo!();
2977    /// use matrix_sdk::ruma::{
2978    ///     events::{
2979    ///         macros::EventContent,
2980    ///         room::member::{RoomMemberEventContent, MembershipState},
2981    ///     },
2982    ///     mxc_uri,
2983    /// };
2984    ///
2985    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
2986    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
2987    /// content.avatar_url = Some(avatar_url);
2988    ///
2989    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
2990    ///
2991    /// // Custom event:
2992    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2993    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
2994    /// struct XxxStateEventContent { /* fields... */ }
2995    ///
2996    /// let content: XxxStateEventContent = todo!();
2997    /// joined_room.send_state_event_for_key("foo", content).await?;
2998    /// # anyhow::Ok(()) };
2999    /// ```
3000    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3001    pub async fn send_state_event_for_key<C, K>(
3002        &self,
3003        state_key: &K,
3004        content: C,
3005    ) -> Result<send_state_event::v3::Response>
3006    where
3007        C: StateEventContent,
3008        C::StateKey: Borrow<K>,
3009        K: AsRef<str> + ?Sized,
3010    {
3011        self.ensure_room_joined()?;
3012        let request =
3013            send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3014        let response = self.client.send(request).await?;
3015        Ok(response)
3016    }
3017
3018    /// Send a state event to the homeserver. If state encryption is enabled in
3019    /// this room, the event will be encrypted.
3020    ///
3021    /// If the experimental state event encryption feature is enabled, this
3022    /// method will transparently encrypt the event if this room is
3023    /// encrypted (except if the event type is considered critical for the room
3024    /// to function, as outlined in [MSC3414][msc3414]).
3025    ///
3026    /// Returns the parsed response from the server.
3027    ///
3028    /// # Arguments
3029    ///
3030    /// * `content` - The content of the state event.
3031    ///
3032    /// * `state_key` - A unique key which defines the overwriting semantics for
3033    ///   this piece of room state.
3034    ///
3035    /// # Examples
3036    ///
3037    /// ```no_run
3038    /// # use serde::{Deserialize, Serialize};
3039    /// # async {
3040    /// # let joined_room: matrix_sdk::Room = todo!();
3041    /// use matrix_sdk::ruma::{
3042    ///     events::{
3043    ///         macros::EventContent,
3044    ///         room::member::{RoomMemberEventContent, MembershipState},
3045    ///     },
3046    ///     mxc_uri,
3047    /// };
3048    ///
3049    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3050    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3051    /// content.avatar_url = Some(avatar_url);
3052    ///
3053    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3054    ///
3055    /// // Custom event:
3056    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3057    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3058    /// struct XxxStateEventContent { /* fields... */ }
3059    ///
3060    /// let content: XxxStateEventContent = todo!();
3061    /// joined_room.send_state_event_for_key("foo", content).await?;
3062    /// # anyhow::Ok(()) };
3063    /// ```
3064    ///
3065    /// [msc3414]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/3414-encrypted-state.md
3066    #[cfg(feature = "experimental-encrypted-state-events")]
3067    pub fn send_state_event_for_key<'a, C, K>(
3068        &'a self,
3069        state_key: &K,
3070        content: C,
3071    ) -> SendStateEvent<'a>
3072    where
3073        C: StateEventContent,
3074        C::StateKey: Borrow<K>,
3075        K: AsRef<str> + ?Sized,
3076    {
3077        SendStateEvent::new(self, state_key, content)
3078    }
3079
3080    /// Send a raw room state event to the homeserver.
3081    ///
3082    /// Returns the parsed response from the server.
3083    ///
3084    /// # Arguments
3085    ///
3086    /// * `event_type` - The type of the event that we're sending out.
3087    ///
3088    /// * `state_key` - A unique key which defines the overwriting semantics for
3089    /// this piece of room state. This value is often a zero-length string.
3090    ///
3091    /// * `content` - The content of the event as a raw JSON value. The argument
3092    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3093    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3094    ///
3095    /// # Examples
3096    ///
3097    /// ```no_run
3098    /// use serde_json::json;
3099    ///
3100    /// # async {
3101    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3102    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3103    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3104    ///
3105    /// if let Some(room) = client.get_room(&room_id) {
3106    ///     room.send_state_event_raw("m.room.member", "", json!({
3107    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3108    ///         "displayname": "Alice Margatroid",
3109    ///         "membership": "join",
3110    ///     })).await?;
3111    /// }
3112    /// # anyhow::Ok(()) };
3113    /// ```
3114    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3115    #[instrument(skip_all)]
3116    pub async fn send_state_event_raw(
3117        &self,
3118        event_type: &str,
3119        state_key: &str,
3120        content: impl IntoRawStateEventContent,
3121    ) -> Result<send_state_event::v3::Response> {
3122        self.ensure_room_joined()?;
3123
3124        let request = send_state_event::v3::Request::new_raw(
3125            self.room_id().to_owned(),
3126            event_type.into(),
3127            state_key.to_owned(),
3128            content.into_raw_state_event_content(),
3129        );
3130
3131        Ok(self.client.send(request).await?)
3132    }
3133
3134    /// Send a raw room state event to the homeserver.
3135    ///
3136    /// If the experimental state event encryption feature is enabled, this
3137    /// method will transparently encrypt the event if this room is
3138    /// encrypted (except if the event type is considered critical for the room
3139    /// to function, as outlined in [MSC3414][msc3414]).
3140    ///
3141    /// Returns the parsed response from the server.
3142    ///
3143    /// # Arguments
3144    ///
3145    /// * `event_type` - The type of the event that we're sending out.
3146    ///
3147    /// * `state_key` - A unique key which defines the overwriting semantics for
3148    /// this piece of room state. This value is often a zero-length string.
3149    ///
3150    /// * `content` - The content of the event as a raw JSON value. The argument
3151    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3152    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3153    ///
3154    /// # Examples
3155    ///
3156    /// ```no_run
3157    /// use serde_json::json;
3158    ///
3159    /// # async {
3160    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3161    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3162    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3163    ///
3164    /// if let Some(room) = client.get_room(&room_id) {
3165    ///     room.send_state_event_raw("m.room.member", "", json!({
3166    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3167    ///         "displayname": "Alice Margatroid",
3168    ///         "membership": "join",
3169    ///     })).await?;
3170    /// }
3171    /// # anyhow::Ok(()) };
3172    /// ```
3173    ///
3174    /// [msc3414]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/3414-encrypted-state.md
3175    #[cfg(feature = "experimental-encrypted-state-events")]
3176    #[instrument(skip_all)]
3177    pub fn send_state_event_raw<'a>(
3178        &'a self,
3179        event_type: &'a str,
3180        state_key: &'a str,
3181        content: impl IntoRawStateEventContent,
3182    ) -> SendRawStateEvent<'a> {
3183        SendRawStateEvent::new(self, event_type, state_key, content)
3184    }
3185
3186    /// Strips all information out of an event of the room.
3187    ///
3188    /// Returns the [`redact_event::v3::Response`] from the server.
3189    ///
3190    /// This cannot be undone. Users may redact their own events, and any user
3191    /// with a power level greater than or equal to the redact power level of
3192    /// the room may redact events there.
3193    ///
3194    /// # Arguments
3195    ///
3196    /// * `event_id` - The ID of the event to redact
3197    ///
3198    /// * `reason` - The reason for the event being redacted.
3199    ///
3200    /// * `txn_id` - A unique ID that can be attached to this event as
3201    /// its transaction ID. If not given one is created for the message.
3202    ///
3203    /// # Examples
3204    ///
3205    /// ```no_run
3206    /// use matrix_sdk::ruma::event_id;
3207    ///
3208    /// # async {
3209    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3210    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3211    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3212    /// #
3213    /// if let Some(room) = client.get_room(&room_id) {
3214    ///     let event_id = event_id!("$xxxxxx:example.org");
3215    ///     let reason = Some("Indecent material");
3216    ///     room.redact(&event_id, reason, None).await?;
3217    /// }
3218    /// # anyhow::Ok(()) };
3219    /// ```
3220    #[instrument(skip_all)]
3221    pub async fn redact(
3222        &self,
3223        event_id: &EventId,
3224        reason: Option<&str>,
3225        txn_id: Option<OwnedTransactionId>,
3226    ) -> HttpResult<redact_event::v3::Response> {
3227        let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3228        let request = assign!(
3229            redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3230            { reason: reason.map(ToOwned::to_owned) }
3231        );
3232
3233        self.client.send(request).await
3234    }
3235
3236    /// Get a list of servers that should know this room.
3237    ///
3238    /// Uses the synced members of the room and the suggested [routing
3239    /// algorithm] from the Matrix spec.
3240    ///
3241    /// Returns at most three servers.
3242    ///
3243    /// [routing algorithm]: https://spec.matrix.org/v1.3/appendices/#routing
3244    pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3245        let acl_ev = self
3246            .get_state_event_static::<RoomServerAclEventContent>()
3247            .await?
3248            .and_then(|ev| ev.deserialize().ok());
3249        let acl = acl_ev.as_ref().and_then(|ev| match ev {
3250            SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3251            SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3252        });
3253
3254        // Filter out server names that:
3255        // - Are blocked due to server ACLs
3256        // - Are IP addresses
3257        let members: Vec<_> = self
3258            .members_no_sync(RoomMemberships::JOIN)
3259            .await?
3260            .into_iter()
3261            .filter(|member| {
3262                let server = member.user_id().server_name();
3263                acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3264            })
3265            .collect();
3266
3267        // Get the server of the highest power level user in the room, provided
3268        // they are at least power level 50.
3269        let max = members
3270            .iter()
3271            .max_by_key(|member| member.power_level())
3272            .filter(|max| max.power_level() >= int!(50))
3273            .map(|member| member.user_id().server_name());
3274
3275        // Sort the servers by population.
3276        let servers = members
3277            .iter()
3278            .map(|member| member.user_id().server_name())
3279            .filter(|server| max.filter(|max| max == server).is_none())
3280            .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3281                *servers.entry(server).or_default() += 1;
3282                servers
3283            });
3284        let mut servers: Vec<_> = servers.into_iter().collect();
3285        servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3286
3287        Ok(max
3288            .into_iter()
3289            .chain(servers.into_iter().map(|(name, _)| name))
3290            .take(3)
3291            .map(ToOwned::to_owned)
3292            .collect())
3293    }
3294
3295    /// Get a `matrix.to` permalink to this room.
3296    ///
3297    /// If this room has an alias, we use it. Otherwise, we try to use the
3298    /// synced members in the room for [routing] the room ID.
3299    ///
3300    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3301    pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3302        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3303            return Ok(alias.matrix_to_uri());
3304        }
3305
3306        let via = self.route().await?;
3307        Ok(self.room_id().matrix_to_uri_via(via))
3308    }
3309
3310    /// Get a `matrix:` permalink to this room.
3311    ///
3312    /// If this room has an alias, we use it. Otherwise, we try to use the
3313    /// synced members in the room for [routing] the room ID.
3314    ///
3315    /// # Arguments
3316    ///
3317    /// * `join` - Whether the user should join the room.
3318    ///
3319    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3320    pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3321        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3322            return Ok(alias.matrix_uri(join));
3323        }
3324
3325        let via = self.route().await?;
3326        Ok(self.room_id().matrix_uri_via(via, join))
3327    }
3328
3329    /// Get a `matrix.to` permalink to an event in this room.
3330    ///
3331    /// We try to use the synced members in the room for [routing] the room ID.
3332    ///
3333    /// *Note*: This method does not check if the given event ID is actually
3334    /// part of this room. It needs to be checked before calling this method
3335    /// otherwise the permalink won't work.
3336    ///
3337    /// # Arguments
3338    ///
3339    /// * `event_id` - The ID of the event.
3340    ///
3341    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3342    pub async fn matrix_to_event_permalink(
3343        &self,
3344        event_id: impl Into<OwnedEventId>,
3345    ) -> Result<MatrixToUri> {
3346        // Don't use the alias because an event is tied to a room ID, but an
3347        // alias might point to another room, e.g. after a room upgrade.
3348        let via = self.route().await?;
3349        Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3350    }
3351
3352    /// Get a `matrix:` permalink to an event in this room.
3353    ///
3354    /// We try to use the synced members in the room for [routing] the room ID.
3355    ///
3356    /// *Note*: This method does not check if the given event ID is actually
3357    /// part of this room. It needs to be checked before calling this method
3358    /// otherwise the permalink won't work.
3359    ///
3360    /// # Arguments
3361    ///
3362    /// * `event_id` - The ID of the event.
3363    ///
3364    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3365    pub async fn matrix_event_permalink(
3366        &self,
3367        event_id: impl Into<OwnedEventId>,
3368    ) -> Result<MatrixUri> {
3369        // Don't use the alias because an event is tied to a room ID, but an
3370        // alias might point to another room, e.g. after a room upgrade.
3371        let via = self.route().await?;
3372        Ok(self.room_id().matrix_event_uri_via(event_id, via))
3373    }
3374
3375    /// Get the latest receipt of a user in this room.
3376    ///
3377    /// # Arguments
3378    ///
3379    /// * `receipt_type` - The type of receipt to get.
3380    ///
3381    /// * `thread` - The thread containing the event of the receipt, if any.
3382    ///
3383    /// * `user_id` - The ID of the user.
3384    ///
3385    /// Returns the ID of the event on which the receipt applies and the
3386    /// receipt.
3387    pub async fn load_user_receipt(
3388        &self,
3389        receipt_type: ReceiptType,
3390        thread: ReceiptThread,
3391        user_id: &UserId,
3392    ) -> Result<Option<(OwnedEventId, Receipt)>> {
3393        self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3394    }
3395
3396    /// Load the receipts for an event in this room from storage.
3397    ///
3398    /// # Arguments
3399    ///
3400    /// * `receipt_type` - The type of receipt to get.
3401    ///
3402    /// * `thread` - The thread containing the event of the receipt, if any.
3403    ///
3404    /// * `event_id` - The ID of the event.
3405    ///
3406    /// Returns a list of IDs of users who have sent a receipt for the event and
3407    /// the corresponding receipts.
3408    pub async fn load_event_receipts(
3409        &self,
3410        receipt_type: ReceiptType,
3411        thread: ReceiptThread,
3412        event_id: &EventId,
3413    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3414        self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3415    }
3416
3417    /// Get the push-condition context for this room.
3418    ///
3419    /// Returns `None` if some data couldn't be found. This should only happen
3420    /// in brand new rooms, while we process its state.
3421    pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3422        self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3423    }
3424
3425    /// Get the push-condition context for this room, with a choice to include
3426    /// thread subscriptions or not, based on the extra
3427    /// `with_threads_subscriptions` parameter.
3428    ///
3429    /// Returns `None` if some data couldn't be found. This should only happen
3430    /// in brand new rooms, while we process its state.
3431    pub(crate) async fn push_condition_room_ctx_internal(
3432        &self,
3433        with_threads_subscriptions: bool,
3434    ) -> Result<Option<PushConditionRoomCtx>> {
3435        let room_id = self.room_id();
3436        let user_id = self.own_user_id();
3437        let room_info = self.clone_info();
3438        let member_count = room_info.active_members_count();
3439
3440        let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3441            member.name().to_owned()
3442        } else {
3443            return Ok(None);
3444        };
3445
3446        let power_levels = match self.power_levels().await {
3447            Ok(power_levels) => Some(power_levels.into()),
3448            Err(error) => {
3449                if matches!(room_info.state(), RoomState::Joined) {
3450                    // It's normal to not have the power levels in a non-joined room, so don't log
3451                    // the error if the room is not joined
3452                    error!("Could not compute power levels for push conditions: {error}");
3453                }
3454                None
3455            }
3456        };
3457
3458        let mut ctx = assign!(PushConditionRoomCtx::new(
3459            room_id.to_owned(),
3460            UInt::new(member_count).unwrap_or(UInt::MAX),
3461            user_id.to_owned(),
3462            user_display_name,
3463        ),
3464        {
3465            power_levels,
3466        });
3467
3468        if with_threads_subscriptions {
3469            let this = self.clone();
3470            ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3471                let room = this.clone();
3472                Box::pin(async move {
3473                    if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3474                        maybe_sub.is_some()
3475                    } else {
3476                        false
3477                    }
3478                })
3479            });
3480        }
3481
3482        Ok(Some(ctx))
3483    }
3484
3485    /// Retrieves a [`PushContext`] that can be used to compute the push
3486    /// actions for events.
3487    pub async fn push_context(&self) -> Result<Option<PushContext>> {
3488        self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3489    }
3490
3491    /// Retrieves a [`PushContext`] that can be used to compute the push actions
3492    /// for events, with a choice to include thread subscriptions or not,
3493    /// based on the extra `with_threads_subscriptions` parameter.
3494    #[instrument(skip(self))]
3495    pub(crate) async fn push_context_internal(
3496        &self,
3497        with_threads_subscriptions: bool,
3498    ) -> Result<Option<PushContext>> {
3499        let Some(push_condition_room_ctx) =
3500            self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3501        else {
3502            debug!("Could not aggregate push context");
3503            return Ok(None);
3504        };
3505        let push_rules = self.client().account().push_rules().await?;
3506        Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3507    }
3508
3509    /// Get the push actions for the given event with the current room state.
3510    ///
3511    /// Note that it is possible that no push action is returned because the
3512    /// current room state does not have all the required state events.
3513    pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3514        if let Some(ctx) = self.push_context().await? {
3515            Ok(Some(ctx.for_event(event).await))
3516        } else {
3517            Ok(None)
3518        }
3519    }
3520
3521    /// The membership details of the (latest) invite for the logged-in user in
3522    /// this room.
3523    pub async fn invite_details(&self) -> Result<Invite> {
3524        let state = self.state();
3525
3526        if state != RoomState::Invited {
3527            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3528        }
3529
3530        let invitee = self
3531            .get_member_no_sync(self.own_user_id())
3532            .await?
3533            .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3534        let event = invitee.event();
3535        let inviter_id = event.sender();
3536        let inviter = self.get_member_no_sync(inviter_id).await?;
3537        Ok(Invite { invitee, inviter })
3538    }
3539
3540    /// Get the membership details for the current user.
3541    ///
3542    /// Returns:
3543    ///     - If the user was present in the room, a
3544    ///       [`RoomMemberWithSenderInfo`] containing both the user info and the
3545    ///       member info of the sender of the `m.room.member` event.
3546    ///     - If the current user is not present, an error.
3547    pub async fn member_with_sender_info(
3548        &self,
3549        user_id: &UserId,
3550    ) -> Result<RoomMemberWithSenderInfo> {
3551        let Some(member) = self.get_member_no_sync(user_id).await? else {
3552            return Err(Error::InsufficientData);
3553        };
3554
3555        let sender_member =
3556            if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3557                // If the sender room member info is already available, return it
3558                Some(member)
3559            } else if self.are_members_synced() {
3560                // The room members are synced and we couldn't find the sender info
3561                None
3562            } else if self.sync_members().await.is_ok() {
3563                // Try getting the sender room member info again after syncing
3564                self.get_member_no_sync(member.event().sender()).await?
3565            } else {
3566                None
3567            };
3568
3569        Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3570    }
3571
3572    /// Forget this room.
3573    ///
3574    /// This communicates to the homeserver that it should forget the room.
3575    ///
3576    /// Only left or banned-from rooms can be forgotten.
3577    pub async fn forget(&self) -> Result<()> {
3578        let state = self.state();
3579        match state {
3580            RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3581                return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3582                    "Left / Banned",
3583                    state,
3584                ))));
3585            }
3586            RoomState::Left | RoomState::Banned => {}
3587        }
3588
3589        let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3590        let _response = self.client.send(request).await?;
3591
3592        // If it was a DM, remove the room from the `m.direct` global account data.
3593        if self.inner.direct_targets_length() != 0
3594            && let Err(e) = self.set_is_direct(false).await
3595        {
3596            // It is not important whether we managed to remove the room, it will not have
3597            // any consequences, so just log the error.
3598            warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3599        }
3600
3601        self.client.base_client().forget_room(self.inner.room_id()).await?;
3602
3603        Ok(())
3604    }
3605
3606    fn ensure_room_joined(&self) -> Result<()> {
3607        let state = self.state();
3608        if state == RoomState::Joined {
3609            Ok(())
3610        } else {
3611            Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3612        }
3613    }
3614
3615    /// Get the notification mode.
3616    pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3617        if !matches!(self.state(), RoomState::Joined) {
3618            return None;
3619        }
3620
3621        let notification_settings = self.client().notification_settings().await;
3622
3623        // Get the user-defined mode if available
3624        let notification_mode =
3625            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3626
3627        if notification_mode.is_some() {
3628            notification_mode
3629        } else if let Ok(is_encrypted) =
3630            self.latest_encryption_state().await.map(|state| state.is_encrypted())
3631        {
3632            // Otherwise, if encrypted status is available, get the default mode for this
3633            // type of room.
3634            // From the point of view of notification settings, a `one-to-one` room is one
3635            // that involves exactly two people.
3636            let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3637            let default_mode = notification_settings
3638                .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3639                .await;
3640            Some(default_mode)
3641        } else {
3642            None
3643        }
3644    }
3645
3646    /// Get the user-defined notification mode.
3647    ///
3648    /// The result is cached for fast and non-async call. To read the cached
3649    /// result, use
3650    /// [`matrix_sdk_base::Room::cached_user_defined_notification_mode`].
3651    //
3652    // Note for maintainers:
3653    //
3654    // The fact the result is cached is an important property. If you change that in
3655    // the future, please review all calls to this method.
3656    pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3657        if !matches!(self.state(), RoomState::Joined) {
3658            return None;
3659        }
3660
3661        let notification_settings = self.client().notification_settings().await;
3662
3663        // Get the user-defined mode if available.
3664        let mode =
3665            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3666
3667        if let Some(mode) = mode {
3668            self.update_cached_user_defined_notification_mode(mode);
3669        }
3670
3671        mode
3672    }
3673
3674    /// Report an event as inappropriate to the homeserver's administrator.
3675    ///
3676    /// # Arguments
3677    ///
3678    /// * `event_id` - The ID of the event to report.
3679    /// * `score` - The score to rate this content.
3680    /// * `reason` - The reason the content is being reported.
3681    ///
3682    /// # Errors
3683    ///
3684    /// Returns an error if the room is not joined or if an error occurs with
3685    /// the request.
3686    pub async fn report_content(
3687        &self,
3688        event_id: OwnedEventId,
3689        score: Option<ReportedContentScore>,
3690        reason: Option<String>,
3691    ) -> Result<report_content::v3::Response> {
3692        let state = self.state();
3693        if state != RoomState::Joined {
3694            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3695        }
3696
3697        let request = report_content::v3::Request::new(
3698            self.inner.room_id().to_owned(),
3699            event_id,
3700            score.map(Into::into),
3701            reason,
3702        );
3703        Ok(self.client.send(request).await?)
3704    }
3705
3706    /// Reports a room as inappropriate to the server.
3707    /// The caller is not required to be joined to the room to report it.
3708    ///
3709    /// # Arguments
3710    ///
3711    /// * `reason` - The reason the room is being reported.
3712    ///
3713    /// # Errors
3714    ///
3715    /// Returns an error if the room is not found or on rate limit
3716    pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3717        let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3718
3719        Ok(self.client.send(request).await?)
3720    }
3721
3722    /// Set a flag on the room to indicate that the user has explicitly marked
3723    /// it as (un)read.
3724    ///
3725    /// This is a no-op if [`BaseRoom::is_marked_unread()`] returns the same
3726    /// value as `unread`.
3727    pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3728        if self.is_marked_unread() == unread {
3729            // The request is not necessary.
3730            return Ok(());
3731        }
3732
3733        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3734
3735        let content = MarkedUnreadEventContent::new(unread);
3736
3737        let request = set_room_account_data::v3::Request::new(
3738            user_id.to_owned(),
3739            self.inner.room_id().to_owned(),
3740            &content,
3741        )?;
3742
3743        self.client.send(request).await?;
3744        Ok(())
3745    }
3746
3747    /// Returns the [`RoomEventCache`] associated to this room, assuming the
3748    /// global [`EventCache`] has been enabled for subscription.
3749    pub async fn event_cache(
3750        &self,
3751    ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3752        self.client.event_cache().for_room(self.room_id()).await
3753    }
3754
3755    /// Get the beacon information event in the room for the `user_id`.
3756    ///
3757    /// # Errors
3758    ///
3759    /// Returns an error if the event is redacted, stripped, not found or could
3760    /// not be deserialized.
3761    pub(crate) async fn get_user_beacon_info(
3762        &self,
3763        user_id: &UserId,
3764    ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3765        let raw_event = self
3766            .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3767            .await?
3768            .ok_or(BeaconError::NotFound)?;
3769
3770        match raw_event.deserialize()? {
3771            SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3772            SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3773            SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3774        }
3775    }
3776
3777    /// Start sharing live location in the room.
3778    ///
3779    /// # Arguments
3780    ///
3781    /// * `duration_millis` - The duration for which the live location is
3782    ///   shared, in milliseconds.
3783    /// * `description` - An optional description for the live location share.
3784    ///
3785    /// # Errors
3786    ///
3787    /// Returns an error if the room is not joined or if the state event could
3788    /// not be sent.
3789    pub async fn start_live_location_share(
3790        &self,
3791        duration_millis: u64,
3792        description: Option<String>,
3793    ) -> Result<send_state_event::v3::Response> {
3794        self.ensure_room_joined()?;
3795
3796        self.send_state_event_for_key(
3797            self.own_user_id(),
3798            BeaconInfoEventContent::new(
3799                description,
3800                Duration::from_millis(duration_millis),
3801                true,
3802                None,
3803            ),
3804        )
3805        .await
3806    }
3807
3808    /// Stop sharing live location in the room.
3809    ///
3810    /// # Errors
3811    ///
3812    /// Returns an error if the room is not joined, if the beacon information
3813    /// is redacted or stripped, or if the state event is not found.
3814    pub async fn stop_live_location_share(
3815        &self,
3816    ) -> Result<send_state_event::v3::Response, BeaconError> {
3817        self.ensure_room_joined()?;
3818
3819        let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3820        beacon_info_event.content.stop();
3821        Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3822    }
3823
3824    /// Send a location beacon event in the current room.
3825    ///
3826    /// # Arguments
3827    ///
3828    /// * `geo_uri` - The geo URI of the location beacon.
3829    ///
3830    /// # Errors
3831    ///
3832    /// Returns an error if the room is not joined, if the beacon information
3833    /// is redacted or stripped, if the location share is no longer live,
3834    /// or if the state event is not found.
3835    pub async fn send_location_beacon(
3836        &self,
3837        geo_uri: String,
3838    ) -> Result<send_message_event::v3::Response, BeaconError> {
3839        self.ensure_room_joined()?;
3840
3841        let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3842
3843        if beacon_info_event.content.is_live() {
3844            let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3845            Ok(self.send(content).await?)
3846        } else {
3847            Err(BeaconError::NotLive)
3848        }
3849    }
3850
3851    /// Store the given `ComposerDraft` in the state store using the current
3852    /// room id and optional thread root id as identifier.
3853    pub async fn save_composer_draft(
3854        &self,
3855        draft: ComposerDraft,
3856        thread_root: Option<&EventId>,
3857    ) -> Result<()> {
3858        self.client
3859            .state_store()
3860            .set_kv_data(
3861                StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
3862                StateStoreDataValue::ComposerDraft(draft),
3863            )
3864            .await?;
3865        Ok(())
3866    }
3867
3868    /// Retrieve the `ComposerDraft` stored in the state store for this room
3869    /// and given thread, if any.
3870    pub async fn load_composer_draft(
3871        &self,
3872        thread_root: Option<&EventId>,
3873    ) -> Result<Option<ComposerDraft>> {
3874        let data = self
3875            .client
3876            .state_store()
3877            .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3878            .await?;
3879        Ok(data.and_then(|d| d.into_composer_draft()))
3880    }
3881
3882    /// Remove the `ComposerDraft` stored in the state store for this room
3883    /// and given thread, if any.
3884    pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
3885        self.client
3886            .state_store()
3887            .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3888            .await?;
3889        Ok(())
3890    }
3891
3892    /// Load pinned state events for a room from the `/state` endpoint in the
3893    /// home server.
3894    pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3895        let response = self
3896            .client
3897            .send(get_state_event_for_key::v3::Request::new(
3898                self.room_id().to_owned(),
3899                StateEventType::RoomPinnedEvents,
3900                "".to_owned(),
3901            ))
3902            .await;
3903
3904        match response {
3905            Ok(response) => Ok(Some(
3906                response
3907                    .into_content()
3908                    .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
3909                    .pinned,
3910            )),
3911            Err(http_error) => match http_error.as_client_api_error() {
3912                Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3913                _ => Err(http_error.into()),
3914            },
3915        }
3916    }
3917
3918    /// Observe live location sharing events for this room.
3919    ///
3920    /// The returned observable will receive the newest event for each sync
3921    /// response that contains an `m.beacon` event.
3922    ///
3923    /// Returns a stream of [`ObservableLiveLocation`] events from other users
3924    /// in the room, excluding the live location events of the room's own user.
3925    pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3926        ObservableLiveLocation::new(&self.client, self.room_id())
3927    }
3928
3929    /// Subscribe to knock requests in this `Room`.
3930    ///
3931    /// The current requests to join the room will be emitted immediately
3932    /// when subscribing.
3933    ///
3934    /// A new set of knock requests will be emitted whenever:
3935    /// - A new member event is received.
3936    /// - A knock request is marked as seen.
3937    /// - A sync is gappy (limited), so room membership information may be
3938    ///   outdated.
3939    ///
3940    /// Returns both a stream of knock requests and a handle for a task that
3941    /// will clean up the seen knock request ids when possible.
3942    pub async fn subscribe_to_knock_requests(
3943        &self,
3944    ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
3945        let this = Arc::new(self.clone());
3946
3947        let room_member_events_observer =
3948            self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3949
3950        let current_seen_ids = self.get_seen_knock_request_ids().await?;
3951        let mut seen_request_ids_stream = self
3952            .seen_knock_request_ids_map
3953            .subscribe()
3954            .await
3955            .map(|values| values.unwrap_or_default());
3956
3957        let mut room_info_stream = self.subscribe_info();
3958
3959        // Spawn a task that will clean up the seen knock request ids when updated room
3960        // members are received
3961        let clear_seen_ids_handle = spawn({
3962            let this = self.clone();
3963            async move {
3964                let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3965                while member_updates_stream.recv().await.is_ok() {
3966                    // If room members were updated, try to remove outdated seen knock request ids
3967                    if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3968                        warn!("Failed to remove seen knock requests: {err}")
3969                    }
3970                }
3971            }
3972        });
3973
3974        let combined_stream = stream! {
3975            // Emit current requests to join
3976            match this.get_current_join_requests(&current_seen_ids).await {
3977                Ok(initial_requests) => yield initial_requests,
3978                Err(err) => warn!("Failed to get initial requests to join: {err}")
3979            }
3980
3981            let mut requests_stream = room_member_events_observer.subscribe();
3982            let mut seen_ids = current_seen_ids.clone();
3983
3984            loop {
3985                // This is equivalent to a combine stream operation, triggering a new emission
3986                // when any of the branches changes
3987                tokio::select! {
3988                    Some((event, _)) = requests_stream.next() => {
3989                        if let Some(event) = event.as_original() {
3990                            // If we can calculate the membership change, try to emit only when needed
3991                            let emit = if event.prev_content().is_some() {
3992                                matches!(event.membership_change(),
3993                                    MembershipChange::Banned |
3994                                    MembershipChange::Knocked |
3995                                    MembershipChange::KnockAccepted |
3996                                    MembershipChange::KnockDenied |
3997                                    MembershipChange::KnockRetracted
3998                                )
3999                            } else {
4000                                // If we can't calculate the membership change, assume we need to
4001                                // emit updated values
4002                                true
4003                            };
4004
4005                            if emit {
4006                                match this.get_current_join_requests(&seen_ids).await {
4007                                    Ok(requests) => yield requests,
4008                                    Err(err) => {
4009                                        warn!("Failed to get updated knock requests on new member event: {err}")
4010                                    }
4011                                }
4012                            }
4013                        }
4014                    }
4015
4016                    Some(new_seen_ids) = seen_request_ids_stream.next() => {
4017                        // Update the current seen ids
4018                        seen_ids = new_seen_ids;
4019
4020                        // If seen requests have changed we need to recalculate
4021                        // all the knock requests
4022                        match this.get_current_join_requests(&seen_ids).await {
4023                            Ok(requests) => yield requests,
4024                            Err(err) => {
4025                                warn!("Failed to get updated knock requests on seen ids changed: {err}")
4026                            }
4027                        }
4028                    }
4029
4030                    Some(room_info) = room_info_stream.next() => {
4031                        // We need to emit new items when we may have missing room members:
4032                        // this usually happens after a gappy (limited) sync
4033                        if !room_info.are_members_synced() {
4034                            match this.get_current_join_requests(&seen_ids).await {
4035                                Ok(requests) => yield requests,
4036                                Err(err) => {
4037                                    warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4038                                }
4039                            }
4040                        }
4041                    }
4042                    // If the streams in all branches are closed, stop the loop
4043                    else => break,
4044                }
4045            }
4046        };
4047
4048        Ok((combined_stream, clear_seen_ids_handle))
4049    }
4050
4051    async fn get_current_join_requests(
4052        &self,
4053        seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4054    ) -> Result<Vec<KnockRequest>> {
4055        Ok(self
4056            .members(RoomMemberships::KNOCK)
4057            .await?
4058            .into_iter()
4059            .filter_map(|member| {
4060                let event_id = member.event().event_id()?;
4061                Some(KnockRequest::new(
4062                    self,
4063                    event_id,
4064                    member.event().timestamp(),
4065                    KnockRequestMemberInfo::from_member(&member),
4066                    seen_request_ids.contains_key(event_id),
4067                ))
4068            })
4069            .collect())
4070    }
4071
4072    /// Access the room settings related to privacy and visibility.
4073    pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4074        RoomPrivacySettings::new(&self.inner, &self.client)
4075    }
4076
4077    /// Retrieve a list of all the threads for the current room.
4078    ///
4079    /// Since this client-server API is paginated, the return type may include a
4080    /// token used to resuming back-pagination into the list of results, in
4081    /// [`ThreadRoots::prev_batch_token`]. This token can be fed back into
4082    /// [`ListThreadsOptions::from`] to continue the pagination
4083    /// from the previous position.
4084    pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4085        let request = opts.into_request(self.room_id());
4086
4087        let response = self.client.send(request).await?;
4088
4089        let push_ctx = self.push_context().await?;
4090        let chunk = join_all(
4091            response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4092        )
4093        .await;
4094
4095        Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4096    }
4097
4098    /// Retrieve a list of relations for the given event, according to the given
4099    /// options.
4100    ///
4101    /// Since this client-server API is paginated, the return type may include a
4102    /// token used to resuming back-pagination into the list of results, in
4103    /// [`Relations::prev_batch_token`]. This token can be fed back into
4104    /// [`RelationsOptions::from`] to continue the pagination from the previous
4105    /// position.
4106    ///
4107    /// **Note**: if [`RelationsOptions::from`] is set for a subsequent request,
4108    /// then it must be used with the same
4109    /// [`RelationsOptions::include_relations`] value as the request that
4110    /// returns the `from` token, otherwise the server behavior is undefined.
4111    pub async fn relations(
4112        &self,
4113        event_id: OwnedEventId,
4114        opts: RelationsOptions,
4115    ) -> Result<Relations> {
4116        opts.send(self, event_id).await
4117    }
4118
4119    /// Search this room's [`RoomIndex`] for query and return at most
4120    /// max_number_of_results results.
4121    #[cfg(feature = "experimental-search")]
4122    pub async fn search(
4123        &self,
4124        query: &str,
4125        max_number_of_results: usize,
4126        pagination_offset: Option<usize>,
4127    ) -> Result<Vec<OwnedEventId>, IndexError> {
4128        let mut search_index_guard = self.client.search_index().lock().await;
4129        search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4130    }
4131
4132    /// Subscribe to a given thread in this room.
4133    ///
4134    /// This will subscribe the user to the thread, so that they will receive
4135    /// notifications for that thread specifically.
4136    ///
4137    /// # Arguments
4138    ///
4139    /// - `thread_root`: The ID of the thread root event to subscribe to.
4140    /// - `automatic`: Whether the subscription was made automatically by a
4141    ///   client, not by manual user choice. If set, must include the latest
4142    ///   event ID that's known in the thread and that is causing the automatic
4143    ///   subscription. If unset (i.e. we're now subscribing manually) and there
4144    ///   was a previous automatic subscription, the subscription will be
4145    ///   overridden to a manual one instead.
4146    ///
4147    /// # Returns
4148    ///
4149    /// - A 404 error if the event isn't known, or isn't a thread root.
4150    /// - An `Ok` result if the subscription was successful, or if the server
4151    ///   skipped an automatic subscription (as the user unsubscribed from the
4152    ///   thread after the event causing the automatic subscription).
4153    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4154    pub async fn subscribe_thread(
4155        &self,
4156        thread_root: OwnedEventId,
4157        automatic: Option<OwnedEventId>,
4158    ) -> Result<()> {
4159        let is_automatic = automatic.is_some();
4160
4161        match self
4162            .client
4163            .send(subscribe_thread::unstable::Request::new(
4164                self.room_id().to_owned(),
4165                thread_root.clone(),
4166                automatic,
4167            ))
4168            .await
4169        {
4170            Ok(_response) => {
4171                trace!("Server acknowledged the thread subscription; saving in db");
4172
4173                // Immediately save the result into the database.
4174                self.client
4175                    .state_store()
4176                    .upsert_thread_subscription(
4177                        self.room_id(),
4178                        &thread_root,
4179                        StoredThreadSubscription {
4180                            status: ThreadSubscriptionStatus::Subscribed {
4181                                automatic: is_automatic,
4182                            },
4183                            bump_stamp: None,
4184                        },
4185                    )
4186                    .await?;
4187
4188                Ok(())
4189            }
4190
4191            Err(err) => {
4192                if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4193                    // In this case: the server indicates that the user unsubscribed *after* the
4194                    // event ID we've used in an automatic subscription; don't
4195                    // save the subscription state in the database, as the
4196                    // previous one should be more correct.
4197                    trace!("Thread subscription skipped: {err}");
4198                    Ok(())
4199                } else {
4200                    // Forward the error to the caller.
4201                    Err(err.into())
4202                }
4203            }
4204        }
4205    }
4206
4207    /// Subscribe to a thread if needed, based on a current subscription to it.
4208    ///
4209    /// This is like [`Self::subscribe_thread`], but it first checks if the user
4210    /// has already subscribed to a thread, so as to minimize sending
4211    /// unnecessary subscriptions which would be ignored by the server.
4212    pub async fn subscribe_thread_if_needed(
4213        &self,
4214        thread_root: &EventId,
4215        automatic: Option<OwnedEventId>,
4216    ) -> Result<()> {
4217        if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4218            // If we have a previous subscription, we should only send the new one if it's
4219            // manual and the previous one was automatic.
4220            if !prev_sub.automatic || automatic.is_some() {
4221                // Either we had already a manual subscription, or we had an automatic one and
4222                // the new one is automatic too: nothing to do!
4223                return Ok(());
4224            }
4225        }
4226        self.subscribe_thread(thread_root.to_owned(), automatic).await
4227    }
4228
4229    /// Unsubscribe from a given thread in this room.
4230    ///
4231    /// # Arguments
4232    ///
4233    /// - `thread_root`: The ID of the thread root event to unsubscribe to.
4234    ///
4235    /// # Returns
4236    ///
4237    /// - An `Ok` result if the unsubscription was successful, or the thread was
4238    ///   already unsubscribed.
4239    /// - A 404 error if the event isn't known, or isn't a thread root.
4240    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4241    pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4242        self.client
4243            .send(unsubscribe_thread::unstable::Request::new(
4244                self.room_id().to_owned(),
4245                thread_root.clone(),
4246            ))
4247            .await?;
4248
4249        trace!("Server acknowledged the thread subscription removal; removed it from db too");
4250
4251        // Immediately save the result into the database.
4252        self.client
4253            .state_store()
4254            .upsert_thread_subscription(
4255                self.room_id(),
4256                &thread_root,
4257                StoredThreadSubscription {
4258                    status: ThreadSubscriptionStatus::Unsubscribed,
4259                    bump_stamp: None,
4260                },
4261            )
4262            .await?;
4263
4264        Ok(())
4265    }
4266
4267    /// Return the current thread subscription for the given thread root in this
4268    /// room.
4269    ///
4270    /// # Arguments
4271    ///
4272    /// - `thread_root`: The ID of the thread root event to get the subscription
4273    ///   for.
4274    ///
4275    /// # Returns
4276    ///
4277    /// - An `Ok` result with `Some(ThreadSubscription)` if we have some
4278    ///   subscription information.
4279    /// - An `Ok` result with `None` if the subscription does not exist, or the
4280    ///   event couldn't be found, or the event isn't a thread.
4281    /// - An error if the request fails for any other reason, such as a network
4282    ///   error.
4283    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4284    pub async fn fetch_thread_subscription(
4285        &self,
4286        thread_root: OwnedEventId,
4287    ) -> Result<Option<ThreadSubscription>> {
4288        let result = self
4289            .client
4290            .send(get_thread_subscription::unstable::Request::new(
4291                self.room_id().to_owned(),
4292                thread_root.clone(),
4293            ))
4294            .await;
4295
4296        let subscription = match result {
4297            Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4298            Err(http_error) => match http_error.as_client_api_error() {
4299                Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4300                _ => return Err(http_error.into()),
4301            },
4302        };
4303
4304        // Keep the database in sync.
4305        if let Some(sub) = &subscription {
4306            self.client
4307                .state_store()
4308                .upsert_thread_subscription(
4309                    self.room_id(),
4310                    &thread_root,
4311                    StoredThreadSubscription {
4312                        status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4313                        bump_stamp: None,
4314                    },
4315                )
4316                .await?;
4317        } else {
4318            // If the subscription was not found, remove it from the database.
4319            self.client
4320                .state_store()
4321                .remove_thread_subscription(self.room_id(), &thread_root)
4322                .await?;
4323        }
4324
4325        Ok(subscription)
4326    }
4327
4328    /// Return the current thread subscription for the given thread root in this
4329    /// room, by getting it from storage if possible, or fetching it from
4330    /// network otherwise.
4331    ///
4332    /// See also [`Self::fetch_thread_subscription`] for the exact semantics of
4333    /// this method.
4334    pub async fn load_or_fetch_thread_subscription(
4335        &self,
4336        thread_root: &EventId,
4337    ) -> Result<Option<ThreadSubscription>> {
4338        // If the thread subscriptions list is outdated, fetch from the server.
4339        if self.client.thread_subscription_catchup().is_outdated() {
4340            return self.fetch_thread_subscription(thread_root.to_owned()).await;
4341        }
4342
4343        // Otherwise, we can rely on the store information.
4344        Ok(self
4345            .client
4346            .state_store()
4347            .load_thread_subscription(self.room_id(), thread_root)
4348            .await
4349            .map(|maybe_sub| {
4350                maybe_sub.and_then(|stored| match stored.status {
4351                    ThreadSubscriptionStatus::Unsubscribed => None,
4352                    ThreadSubscriptionStatus::Subscribed { automatic } => {
4353                        Some(ThreadSubscription { automatic })
4354                    }
4355                })
4356            })?)
4357    }
4358}
4359
4360#[cfg(feature = "e2e-encryption")]
4361impl RoomIdentityProvider for Room {
4362    fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4363        Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4364    }
4365
4366    fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4367        Box::pin(async {
4368            let members = self
4369                .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4370                .await
4371                .unwrap_or_else(|_| Default::default());
4372
4373            let mut ret: Vec<UserIdentity> = Vec::new();
4374            for member in members {
4375                if let Some(i) = self.user_identity(member.user_id()).await {
4376                    ret.push(i);
4377                }
4378            }
4379            ret
4380        })
4381    }
4382
4383    fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4384        Box::pin(async {
4385            self.client
4386                .encryption()
4387                .get_user_identity(user_id)
4388                .await
4389                .unwrap_or(None)
4390                .map(|u| u.underlying_identity())
4391        })
4392    }
4393}
4394
4395/// A wrapper for a weak client and a room id that allows to lazily retrieve a
4396/// room, only when needed.
4397#[derive(Clone, Debug)]
4398pub(crate) struct WeakRoom {
4399    client: WeakClient,
4400    room_id: OwnedRoomId,
4401}
4402
4403impl WeakRoom {
4404    /// Create a new `WeakRoom` given its weak components.
4405    pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4406        Self { client, room_id }
4407    }
4408
4409    /// Attempts to reconstruct the room.
4410    pub fn get(&self) -> Option<Room> {
4411        self.client.get().and_then(|client| client.get_room(&self.room_id))
4412    }
4413
4414    /// The room id for that room.
4415    pub fn room_id(&self) -> &RoomId {
4416        &self.room_id
4417    }
4418}
4419
4420/// Details of the (latest) invite.
4421#[derive(Debug, Clone)]
4422pub struct Invite {
4423    /// Who has been invited.
4424    pub invitee: RoomMember,
4425    /// Who sent the invite.
4426    pub inviter: Option<RoomMember>,
4427}
4428
4429#[derive(Error, Debug)]
4430enum InvitationError {
4431    #[error("No membership event found")]
4432    EventMissing,
4433}
4434
4435/// Receipts to send all at once.
4436#[derive(Debug, Clone, Default)]
4437#[non_exhaustive]
4438pub struct Receipts {
4439    /// Fully-read marker (room account data).
4440    pub fully_read: Option<OwnedEventId>,
4441    /// Read receipt (public ephemeral room event).
4442    pub public_read_receipt: Option<OwnedEventId>,
4443    /// Read receipt (private ephemeral room event).
4444    pub private_read_receipt: Option<OwnedEventId>,
4445}
4446
4447impl Receipts {
4448    /// Create an empty `Receipts`.
4449    pub fn new() -> Self {
4450        Self::default()
4451    }
4452
4453    /// Set the last event the user has read.
4454    ///
4455    /// It means that the user has read all the events before this event.
4456    ///
4457    /// This is a private marker only visible by the user.
4458    ///
4459    /// Note that this is technically not a receipt as it is persisted in the
4460    /// room account data.
4461    pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4462        self.fully_read = event_id.into();
4463        self
4464    }
4465
4466    /// Set the last event presented to the user and forward it to the other
4467    /// users in the room.
4468    ///
4469    /// This is used to reset the unread messages/notification count and
4470    /// advertise to other users the last event that the user has likely seen.
4471    pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4472        self.public_read_receipt = event_id.into();
4473        self
4474    }
4475
4476    /// Set the last event presented to the user and don't forward it.
4477    ///
4478    /// This is used to reset the unread messages/notification count.
4479    pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4480        self.private_read_receipt = event_id.into();
4481        self
4482    }
4483
4484    /// Whether this `Receipts` is empty.
4485    pub fn is_empty(&self) -> bool {
4486        self.fully_read.is_none()
4487            && self.public_read_receipt.is_none()
4488            && self.private_read_receipt.is_none()
4489    }
4490}
4491
4492/// [Parent space](https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships)
4493/// listed by a room, possibly validated by checking the space's state.
4494#[derive(Debug)]
4495pub enum ParentSpace {
4496    /// The room recognizes the given room as its parent, and the parent
4497    /// recognizes it as its child.
4498    Reciprocal(Room),
4499    /// The room recognizes the given room as its parent, but the parent does
4500    /// not recognizes it as its child. However, the author of the
4501    /// `m.space.parent` event in the room has a sufficient power level in the
4502    /// parent to create the child event.
4503    WithPowerlevel(Room),
4504    /// The room recognizes the given room as its parent, but the parent does
4505    /// not recognizes it as its child.
4506    Illegitimate(Room),
4507    /// The room recognizes the given id as its parent room, but we cannot check
4508    /// whether the parent recognizes it as its child.
4509    Unverifiable(OwnedRoomId),
4510}
4511
4512/// The score to rate an inappropriate content.
4513///
4514/// Must be a value between `0`, inoffensive, and `-100`, very offensive.
4515#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4516pub struct ReportedContentScore(i8);
4517
4518impl ReportedContentScore {
4519    /// The smallest value that can be represented by this type.
4520    ///
4521    /// This is for very offensive content.
4522    pub const MIN: Self = Self(-100);
4523
4524    /// The largest value that can be represented by this type.
4525    ///
4526    /// This is for inoffensive content.
4527    pub const MAX: Self = Self(0);
4528
4529    /// Try to create a `ReportedContentScore` from the provided `i8`.
4530    ///
4531    /// Returns `None` if it is smaller than [`ReportedContentScore::MIN`] or
4532    /// larger than [`ReportedContentScore::MAX`] .
4533    ///
4534    /// This is the same as the `TryFrom<i8>` implementation for
4535    /// `ReportedContentScore`, except that it returns an `Option` instead
4536    /// of a `Result`.
4537    pub fn new(value: i8) -> Option<Self> {
4538        value.try_into().ok()
4539    }
4540
4541    /// Create a `ReportedContentScore` from the provided `i8` clamped to the
4542    /// acceptable interval.
4543    ///
4544    /// The given value gets clamped into the closed interval between
4545    /// [`ReportedContentScore::MIN`] and [`ReportedContentScore::MAX`].
4546    pub fn new_saturating(value: i8) -> Self {
4547        if value > Self::MAX {
4548            Self::MAX
4549        } else if value < Self::MIN {
4550            Self::MIN
4551        } else {
4552            Self(value)
4553        }
4554    }
4555
4556    /// The value of this score.
4557    pub fn value(&self) -> i8 {
4558        self.0
4559    }
4560}
4561
4562impl PartialEq<i8> for ReportedContentScore {
4563    fn eq(&self, other: &i8) -> bool {
4564        self.0.eq(other)
4565    }
4566}
4567
4568impl PartialEq<ReportedContentScore> for i8 {
4569    fn eq(&self, other: &ReportedContentScore) -> bool {
4570        self.eq(&other.0)
4571    }
4572}
4573
4574impl PartialOrd<i8> for ReportedContentScore {
4575    fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4576        self.0.partial_cmp(other)
4577    }
4578}
4579
4580impl PartialOrd<ReportedContentScore> for i8 {
4581    fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4582        self.partial_cmp(&other.0)
4583    }
4584}
4585
4586impl From<ReportedContentScore> for Int {
4587    fn from(value: ReportedContentScore) -> Self {
4588        value.0.into()
4589    }
4590}
4591
4592impl TryFrom<i8> for ReportedContentScore {
4593    type Error = TryFromReportedContentScoreError;
4594
4595    fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4596        if value > Self::MAX || value < Self::MIN {
4597            Err(TryFromReportedContentScoreError(()))
4598        } else {
4599            Ok(Self(value))
4600        }
4601    }
4602}
4603
4604impl TryFrom<i16> for ReportedContentScore {
4605    type Error = TryFromReportedContentScoreError;
4606
4607    fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4608        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4609        value.try_into()
4610    }
4611}
4612
4613impl TryFrom<i32> for ReportedContentScore {
4614    type Error = TryFromReportedContentScoreError;
4615
4616    fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4617        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4618        value.try_into()
4619    }
4620}
4621
4622impl TryFrom<i64> for ReportedContentScore {
4623    type Error = TryFromReportedContentScoreError;
4624
4625    fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4626        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4627        value.try_into()
4628    }
4629}
4630
4631impl TryFrom<Int> for ReportedContentScore {
4632    type Error = TryFromReportedContentScoreError;
4633
4634    fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4635        let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4636        value.try_into()
4637    }
4638}
4639
4640trait EventSource {
4641    fn get_event(
4642        &self,
4643        event_id: &EventId,
4644    ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4645}
4646
4647impl EventSource for &Room {
4648    async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4649        self.load_or_fetch_event(event_id, None).await
4650    }
4651}
4652
4653/// The error type returned when a checked `ReportedContentScore` conversion
4654/// fails.
4655#[derive(Debug, Clone, Error)]
4656#[error("out of range conversion attempted")]
4657pub struct TryFromReportedContentScoreError(());
4658
4659/// Contains the current user's room member info and the optional room member
4660/// info of the sender of the `m.room.member` event that this info represents.
4661#[derive(Debug)]
4662pub struct RoomMemberWithSenderInfo {
4663    /// The actual room member.
4664    pub room_member: RoomMember,
4665    /// The info of the sender of the event `room_member` is based on, if
4666    /// available.
4667    pub sender_info: Option<RoomMember>,
4668}
4669
4670#[cfg(all(test, not(target_family = "wasm")))]
4671mod tests {
4672    use std::collections::BTreeMap;
4673
4674    use matrix_sdk_base::{ComposerDraft, store::ComposerDraftType};
4675    use matrix_sdk_test::{
4676        JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
4677        event_factory::EventFactory, test_json,
4678    };
4679    use ruma::{
4680        RoomVersionId, event_id,
4681        events::{relation::RelationType, room::member::MembershipState},
4682        int, owned_event_id, room_id, user_id,
4683    };
4684    use wiremock::{
4685        Mock, MockServer, ResponseTemplate,
4686        matchers::{header, method, path_regex},
4687    };
4688
4689    use super::ReportedContentScore;
4690    use crate::{
4691        Client,
4692        config::RequestConfig,
4693        room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4694        test_utils::{
4695            client::mock_matrix_session,
4696            logged_in_client,
4697            mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4698        },
4699    };
4700
4701    #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4702    #[async_test]
4703    async fn test_cache_invalidation_while_encrypt() {
4704        use matrix_sdk_base::store::RoomLoadSettings;
4705        use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4706
4707        let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4708        let session = mock_matrix_session();
4709
4710        let client = Client::builder()
4711            .homeserver_url("http://localhost:1234")
4712            .request_config(RequestConfig::new().disable_retry())
4713            .sqlite_store(&sqlite_path, None)
4714            .build()
4715            .await
4716            .unwrap();
4717        client
4718            .matrix_auth()
4719            .restore_session(session.clone(), RoomLoadSettings::default())
4720            .await
4721            .unwrap();
4722
4723        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4724
4725        // Mock receiving an event to create an internal room.
4726        let server = MockServer::start().await;
4727        {
4728            Mock::given(method("GET"))
4729                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4730                .and(header("authorization", "Bearer 1234"))
4731                .respond_with(
4732                    ResponseTemplate::new(200)
4733                        .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4734                )
4735                .mount(&server)
4736                .await;
4737            let response = SyncResponseBuilder::default()
4738                .add_joined_room(
4739                    JoinedRoomBuilder::default()
4740                        .add_state_event(StateTestEvent::Member)
4741                        .add_state_event(StateTestEvent::PowerLevels)
4742                        .add_state_event(StateTestEvent::Encryption),
4743                )
4744                .build_sync_response();
4745            client.base_client().receive_sync_response(response).await.unwrap();
4746        }
4747
4748        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4749
4750        // Step 1, preshare the room keys.
4751        room.preshare_room_key().await.unwrap();
4752
4753        // Step 2, force lock invalidation by pretending another client obtained the
4754        // lock.
4755        {
4756            let client = Client::builder()
4757                .homeserver_url("http://localhost:1234")
4758                .request_config(RequestConfig::new().disable_retry())
4759                .sqlite_store(&sqlite_path, None)
4760                .build()
4761                .await
4762                .unwrap();
4763            client
4764                .matrix_auth()
4765                .restore_session(session.clone(), RoomLoadSettings::default())
4766                .await
4767                .unwrap();
4768            client
4769                .encryption()
4770                .enable_cross_process_store_lock("client2".to_owned())
4771                .await
4772                .unwrap();
4773
4774            let guard = client.encryption().spin_lock_store(None).await.unwrap();
4775            assert!(guard.is_some());
4776        }
4777
4778        // Step 3, take the crypto-store lock.
4779        let guard = client.encryption().spin_lock_store(None).await.unwrap();
4780        assert!(guard.is_some());
4781
4782        // Step 4, try to encrypt a message.
4783        let olm = client.olm_machine().await;
4784        let olm = olm.as_ref().expect("Olm machine wasn't started");
4785
4786        // Now pretend we're encrypting an event; the olm machine shouldn't rely on
4787        // caching the outgoing session before.
4788        let _encrypted_content = olm
4789            .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4790            .await
4791            .unwrap();
4792    }
4793
4794    #[test]
4795    fn reported_content_score() {
4796        // i8
4797        let score = ReportedContentScore::new(0).unwrap();
4798        assert_eq!(score.value(), 0);
4799        let score = ReportedContentScore::new(-50).unwrap();
4800        assert_eq!(score.value(), -50);
4801        let score = ReportedContentScore::new(-100).unwrap();
4802        assert_eq!(score.value(), -100);
4803        assert_eq!(ReportedContentScore::new(10), None);
4804        assert_eq!(ReportedContentScore::new(-110), None);
4805
4806        let score = ReportedContentScore::new_saturating(0);
4807        assert_eq!(score.value(), 0);
4808        let score = ReportedContentScore::new_saturating(-50);
4809        assert_eq!(score.value(), -50);
4810        let score = ReportedContentScore::new_saturating(-100);
4811        assert_eq!(score.value(), -100);
4812        let score = ReportedContentScore::new_saturating(10);
4813        assert_eq!(score, ReportedContentScore::MAX);
4814        let score = ReportedContentScore::new_saturating(-110);
4815        assert_eq!(score, ReportedContentScore::MIN);
4816
4817        // i16
4818        let score = ReportedContentScore::try_from(0i16).unwrap();
4819        assert_eq!(score.value(), 0);
4820        let score = ReportedContentScore::try_from(-100i16).unwrap();
4821        assert_eq!(score.value(), -100);
4822        ReportedContentScore::try_from(10i16).unwrap_err();
4823        ReportedContentScore::try_from(-110i16).unwrap_err();
4824
4825        // i32
4826        let score = ReportedContentScore::try_from(0i32).unwrap();
4827        assert_eq!(score.value(), 0);
4828        let score = ReportedContentScore::try_from(-100i32).unwrap();
4829        assert_eq!(score.value(), -100);
4830        ReportedContentScore::try_from(10i32).unwrap_err();
4831        ReportedContentScore::try_from(-110i32).unwrap_err();
4832
4833        // i64
4834        let score = ReportedContentScore::try_from(0i64).unwrap();
4835        assert_eq!(score.value(), 0);
4836        let score = ReportedContentScore::try_from(-100i64).unwrap();
4837        assert_eq!(score.value(), -100);
4838        ReportedContentScore::try_from(10i64).unwrap_err();
4839        ReportedContentScore::try_from(-110i64).unwrap_err();
4840
4841        // Int
4842        let score = ReportedContentScore::try_from(int!(0)).unwrap();
4843        assert_eq!(score.value(), 0);
4844        let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4845        assert_eq!(score.value(), -100);
4846        ReportedContentScore::try_from(int!(10)).unwrap_err();
4847        ReportedContentScore::try_from(int!(-110)).unwrap_err();
4848    }
4849
4850    #[async_test]
4851    async fn test_composer_draft() {
4852        use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4853
4854        let client = logged_in_client(None).await;
4855
4856        let response = SyncResponseBuilder::default()
4857            .add_joined_room(JoinedRoomBuilder::default())
4858            .build_sync_response();
4859        client.base_client().receive_sync_response(response).await.unwrap();
4860        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4861
4862        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4863
4864        // Save 2 drafts, one for the room and one for a thread.
4865
4866        let draft = ComposerDraft {
4867            plain_text: "Hello, world!".to_owned(),
4868            html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4869            draft_type: ComposerDraftType::NewMessage,
4870        };
4871
4872        room.save_composer_draft(draft.clone(), None).await.unwrap();
4873
4874        let thread_root = owned_event_id!("$thread_root:b.c");
4875        let thread_draft = ComposerDraft {
4876            plain_text: "Hello, thread!".to_owned(),
4877            html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4878            draft_type: ComposerDraftType::NewMessage,
4879        };
4880
4881        room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4882
4883        // Check that the room draft was saved correctly
4884        assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4885
4886        // Check that the thread draft was saved correctly
4887        assert_eq!(
4888            room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4889            Some(thread_draft.clone())
4890        );
4891
4892        // Clear the room draft
4893        room.clear_composer_draft(None).await.unwrap();
4894        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4895
4896        // Check that the thread one is still there
4897        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4898
4899        // Clear the thread draft as well
4900        room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4901        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4902    }
4903
4904    #[async_test]
4905    async fn test_mark_join_requests_as_seen() {
4906        let server = MatrixMockServer::new().await;
4907        let client = server.client_builder().build().await;
4908        let event_id = event_id!("$a:b.c");
4909        let room_id = room_id!("!a:b.c");
4910        let user_id = user_id!("@alice:b.c");
4911
4912        let f = EventFactory::new().room(room_id);
4913        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4914            f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into_raw(),
4915        ]);
4916        let room = server.sync_room(&client, joined_room_builder).await;
4917
4918        // When loading the initial seen ids, there are none
4919        let seen_ids =
4920            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4921        assert!(seen_ids.is_empty());
4922
4923        // We mark a random event id as seen
4924        room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4925            .await
4926            .expect("Couldn't mark join request as seen");
4927
4928        // Then we can check it was successfully marked as seen
4929        let seen_ids =
4930            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4931        assert_eq!(seen_ids.len(), 1);
4932        assert_eq!(
4933            seen_ids.into_iter().next().expect("No next value"),
4934            (event_id.to_owned(), user_id.to_owned())
4935        )
4936    }
4937
4938    #[async_test]
4939    async fn test_own_room_membership_with_no_own_member_event() {
4940        let server = MatrixMockServer::new().await;
4941        let client = server.client_builder().build().await;
4942        let room_id = room_id!("!a:b.c");
4943
4944        let room = server.sync_joined_room(&client, room_id).await;
4945
4946        // Since there is no member event for the own user, the method fails.
4947        // This should never happen in an actual room.
4948        let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4949        assert!(error.is_some());
4950    }
4951
4952    #[async_test]
4953    async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4954        let server = MatrixMockServer::new().await;
4955        let client = server.client_builder().build().await;
4956        let room_id = room_id!("!a:b.c");
4957        let user_id = user_id!("@example:localhost");
4958
4959        let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
4960        let joined_room_builder =
4961            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4962        let room = server.sync_room(&client, joined_room_builder).await;
4963
4964        // When we load the membership details
4965        let ret = room
4966            .member_with_sender_info(client.user_id().unwrap())
4967            .await
4968            .expect("Room member info should be available");
4969
4970        // We get the member info for the current user
4971        assert_eq!(ret.room_member.event().user_id(), user_id);
4972
4973        // But there is no info for the sender
4974        assert!(ret.sender_info.is_none());
4975    }
4976
4977    #[async_test]
4978    async fn test_own_room_membership_with_own_member_event_and_own_sender() {
4979        let server = MatrixMockServer::new().await;
4980        let client = server.client_builder().build().await;
4981        let room_id = room_id!("!a:b.c");
4982        let user_id = user_id!("@example:localhost");
4983
4984        let f = EventFactory::new().room(room_id).sender(user_id);
4985        let joined_room_builder =
4986            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4987        let room = server.sync_room(&client, joined_room_builder).await;
4988
4989        // When we load the membership details
4990        let ret = room
4991            .member_with_sender_info(client.user_id().unwrap())
4992            .await
4993            .expect("Room member info should be available");
4994
4995        // We get the current user's member info
4996        assert_eq!(ret.room_member.event().user_id(), user_id);
4997
4998        // And the sender has the same info, since it's also the current user
4999        assert!(ret.sender_info.is_some());
5000        assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5001    }
5002
5003    #[async_test]
5004    async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5005        let server = MatrixMockServer::new().await;
5006        let client = server.client_builder().build().await;
5007        let room_id = room_id!("!a:b.c");
5008        let user_id = user_id!("@example:localhost");
5009        let sender_id = user_id!("@alice:b.c");
5010
5011        let f = EventFactory::new().room(room_id).sender(sender_id);
5012        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5013            f.member(user_id).into_raw(),
5014            // The sender info comes from the sync
5015            f.member(sender_id).into_raw(),
5016        ]);
5017        let room = server.sync_room(&client, joined_room_builder).await;
5018
5019        // When we load the membership details
5020        let ret = room
5021            .member_with_sender_info(client.user_id().unwrap())
5022            .await
5023            .expect("Room member info should be available");
5024
5025        // We get the current user's member info
5026        assert_eq!(ret.room_member.event().user_id(), user_id);
5027
5028        // And also the sender info from the events received in the sync
5029        assert!(ret.sender_info.is_some());
5030        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5031    }
5032
5033    #[async_test]
5034    async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5035        let server = MatrixMockServer::new().await;
5036        let client = server.client_builder().build().await;
5037        let room_id = room_id!("!a:b.c");
5038        let user_id = user_id!("@example:localhost");
5039        let sender_id = user_id!("@alice:b.c");
5040
5041        let f = EventFactory::new().room(room_id).sender(sender_id);
5042        let joined_room_builder =
5043            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
5044        let room = server.sync_room(&client, joined_room_builder).await;
5045
5046        // We'll receive the member info through the /members endpoint
5047        server
5048            .mock_get_members()
5049            .ok(vec![f.member(sender_id).into_raw()])
5050            .mock_once()
5051            .mount()
5052            .await;
5053
5054        // We get the current user's member info
5055        let ret = room
5056            .member_with_sender_info(client.user_id().unwrap())
5057            .await
5058            .expect("Room member info should be available");
5059
5060        // We get the current user's member info
5061        assert_eq!(ret.room_member.event().user_id(), user_id);
5062
5063        // And also the sender info from the /members endpoint
5064        assert!(ret.sender_info.is_some());
5065        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5066    }
5067
5068    #[async_test]
5069    async fn test_list_threads() {
5070        let server = MatrixMockServer::new().await;
5071        let client = server.client_builder().build().await;
5072
5073        let room_id = room_id!("!a:b.c");
5074        let sender_id = user_id!("@alice:b.c");
5075        let f = EventFactory::new().room(room_id).sender(sender_id);
5076
5077        let eid1 = event_id!("$1");
5078        let eid2 = event_id!("$2");
5079        let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5080        let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5081
5082        server
5083            .mock_room_threads()
5084            .ok(batch1.clone(), Some("prev_batch".to_owned()))
5085            .mock_once()
5086            .mount()
5087            .await;
5088        server
5089            .mock_room_threads()
5090            .match_from("prev_batch")
5091            .ok(batch2, None)
5092            .mock_once()
5093            .mount()
5094            .await;
5095
5096        let room = server.sync_joined_room(&client, room_id).await;
5097        let result =
5098            room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5099        assert_eq!(result.chunk.len(), 1);
5100        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5101        assert!(result.prev_batch_token.is_some());
5102
5103        let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5104        let result = room.list_threads(opts).await.expect("Failed to list threads");
5105        assert_eq!(result.chunk.len(), 1);
5106        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5107        assert!(result.prev_batch_token.is_none());
5108    }
5109
5110    #[async_test]
5111    async fn test_relations() {
5112        let server = MatrixMockServer::new().await;
5113        let client = server.client_builder().build().await;
5114
5115        let room_id = room_id!("!a:b.c");
5116        let sender_id = user_id!("@alice:b.c");
5117        let f = EventFactory::new().room(room_id).sender(sender_id);
5118
5119        let target_event_id = owned_event_id!("$target");
5120        let eid1 = event_id!("$1");
5121        let eid2 = event_id!("$2");
5122        let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5123        let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5124
5125        server
5126            .mock_room_relations()
5127            .match_target_event(target_event_id.clone())
5128            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5129            .mock_once()
5130            .mount()
5131            .await;
5132
5133        server
5134            .mock_room_relations()
5135            .match_target_event(target_event_id.clone())
5136            .match_from("next_batch")
5137            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5138            .mock_once()
5139            .mount()
5140            .await;
5141
5142        let room = server.sync_joined_room(&client, room_id).await;
5143
5144        // Main endpoint: no relation type filtered out.
5145        let mut opts = RelationsOptions {
5146            include_relations: IncludeRelations::AllRelations,
5147            ..Default::default()
5148        };
5149        let result = room
5150            .relations(target_event_id.clone(), opts.clone())
5151            .await
5152            .expect("Failed to list relations the first time");
5153        assert_eq!(result.chunk.len(), 1);
5154        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5155        assert!(result.prev_batch_token.is_none());
5156        assert!(result.next_batch_token.is_some());
5157        assert!(result.recursion_depth.is_none());
5158
5159        opts.from = result.next_batch_token;
5160        let result = room
5161            .relations(target_event_id, opts)
5162            .await
5163            .expect("Failed to list relations the second time");
5164        assert_eq!(result.chunk.len(), 1);
5165        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5166        assert!(result.prev_batch_token.is_none());
5167        assert!(result.next_batch_token.is_none());
5168        assert!(result.recursion_depth.is_none());
5169    }
5170
5171    #[async_test]
5172    async fn test_relations_with_reltype() {
5173        let server = MatrixMockServer::new().await;
5174        let client = server.client_builder().build().await;
5175
5176        let room_id = room_id!("!a:b.c");
5177        let sender_id = user_id!("@alice:b.c");
5178        let f = EventFactory::new().room(room_id).sender(sender_id);
5179
5180        let target_event_id = owned_event_id!("$target");
5181        let eid1 = event_id!("$1");
5182        let eid2 = event_id!("$2");
5183        let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5184        let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5185
5186        server
5187            .mock_room_relations()
5188            .match_target_event(target_event_id.clone())
5189            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5190            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5191            .mock_once()
5192            .mount()
5193            .await;
5194
5195        server
5196            .mock_room_relations()
5197            .match_target_event(target_event_id.clone())
5198            .match_from("next_batch")
5199            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5200            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5201            .mock_once()
5202            .mount()
5203            .await;
5204
5205        let room = server.sync_joined_room(&client, room_id).await;
5206
5207        // Reltype-filtered endpoint, for threads \o/
5208        let mut opts = RelationsOptions {
5209            include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5210            ..Default::default()
5211        };
5212        let result = room
5213            .relations(target_event_id.clone(), opts.clone())
5214            .await
5215            .expect("Failed to list relations the first time");
5216        assert_eq!(result.chunk.len(), 1);
5217        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5218        assert!(result.prev_batch_token.is_none());
5219        assert!(result.next_batch_token.is_some());
5220        assert!(result.recursion_depth.is_none());
5221
5222        opts.from = result.next_batch_token;
5223        let result = room
5224            .relations(target_event_id, opts)
5225            .await
5226            .expect("Failed to list relations the second time");
5227        assert_eq!(result.chunk.len(), 1);
5228        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5229        assert!(result.prev_batch_token.is_none());
5230        assert!(result.next_batch_token.is_none());
5231        assert!(result.recursion_depth.is_none());
5232    }
5233
5234    #[async_test]
5235    async fn test_power_levels_computation() {
5236        let server = MatrixMockServer::new().await;
5237        let client = server.client_builder().build().await;
5238
5239        let room_id = room_id!("!a:b.c");
5240        let sender_id = client.user_id().expect("No session id");
5241        let f = EventFactory::new().room(room_id).sender(sender_id);
5242        let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5243
5244        // Computing the power levels will need these 3 state events:
5245        let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into_raw();
5246        let power_levels_event = f.power_levels(&mut user_map).state_key("").into_raw();
5247        let room_member_event = f.member(sender_id).into_raw();
5248
5249        // With only the room member event
5250        let room = server
5251            .sync_room(
5252                &client,
5253                JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event.clone()]),
5254            )
5255            .await;
5256        let ctx = room
5257            .push_condition_room_ctx()
5258            .await
5259            .expect("Failed to get push condition context")
5260            .expect("Could not get push condition context");
5261
5262        // The internal power levels couldn't be computed
5263        assert!(ctx.power_levels.is_none());
5264
5265        // Adding the room creation event
5266        let room = server
5267            .sync_room(
5268                &client,
5269                JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event.clone()]),
5270            )
5271            .await;
5272        let ctx = room
5273            .push_condition_room_ctx()
5274            .await
5275            .expect("Failed to get push condition context")
5276            .expect("Could not get push condition context");
5277
5278        // The internal power levels still couldn't be computed
5279        assert!(ctx.power_levels.is_none());
5280
5281        // With the room member, room creation and the power levels events
5282        let room = server
5283            .sync_room(
5284                &client,
5285                JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5286            )
5287            .await;
5288        let ctx = room
5289            .push_condition_room_ctx()
5290            .await
5291            .expect("Failed to get push condition context")
5292            .expect("Could not get push condition context");
5293
5294        // The internal power levels can finally be computed
5295        assert!(ctx.power_levels.is_some());
5296    }
5297}